control/controlclient: refactor in prep for optimized delta handling

See issue. This is a baby step towards passing through deltas
end-to-end from node to control back to node and down to the various
engine subsystems, not computing diffs from two full netmaps at
various levels. This will then let us support larger netmaps without
burning CPU.

But this change itself changes no behavior. It just changes a func
type to an interface with one method. That paves the way for future
changes to then add new NetmapUpdater methods that do more
fine-grained work than updating the whole world.

Updates #1909

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick 2023-08-12 08:18:10 -07:00 committed by Brad Fitzpatrick
parent 4940a718a1
commit 2398993804
2 changed files with 71 additions and 43 deletions

View File

@ -480,11 +480,42 @@ func (c *Auto) unpausedChanLocked() <-chan struct{} {
return unpaused return unpaused
} }
// mapRoutineState is the state of Auto.mapRoutine while it's running.
type mapRoutineState struct {
c *Auto
bo *backoff.Backoff
}
func (mrs mapRoutineState) UpdateFullNetmap(nm *netmap.NetworkMap) {
c := mrs.c
health.SetInPollNetMap(true)
c.mu.Lock()
ctx := c.mapCtx
c.synced = true
if c.loggedIn {
c.state = StateSynchronized
}
c.expiry = ptr.To(nm.Expiry)
stillAuthed := c.loggedIn
c.logf("[v1] mapRoutine: netmap received: %s", c.state)
c.mu.Unlock()
if stillAuthed {
c.sendStatus("mapRoutine-got-netmap", nil, "", nm)
}
// Reset the backoff timer if we got a netmap.
mrs.bo.BackOff(ctx, nil)
}
// mapRoutine is responsible for keeping a read-only streaming connection to the // mapRoutine is responsible for keeping a read-only streaming connection to the
// control server, and keeping the netmap up to date. // control server, and keeping the netmap up to date.
func (c *Auto) mapRoutine() { func (c *Auto) mapRoutine() {
defer close(c.mapDone) defer close(c.mapDone)
bo := backoff.NewBackoff("mapRoutine", c.logf, 30*time.Second) mrs := &mapRoutineState{
c: c,
bo: backoff.NewBackoff("mapRoutine", c.logf, 30*time.Second),
}
for { for {
if err := c.waitUnpause("mapRoutine"); err != nil { if err := c.waitUnpause("mapRoutine"); err != nil {
@ -531,25 +562,7 @@ func (c *Auto) mapRoutine() {
} else { } else {
health.SetInPollNetMap(false) health.SetInPollNetMap(false)
err := c.direct.PollNetMap(ctx, func(nm *netmap.NetworkMap) { err := c.direct.PollNetMap(ctx, mrs)
health.SetInPollNetMap(true)
c.mu.Lock()
c.synced = true
if c.loggedIn {
c.state = StateSynchronized
}
c.expiry = ptr.To(nm.Expiry)
stillAuthed := c.loggedIn
c.logf("[v1] mapRoutine: netmap received: %s", c.state)
c.mu.Unlock()
if stillAuthed {
c.sendStatus("mapRoutine-got-netmap", nil, "", nm)
}
// Reset the backoff timer if we got a netmap.
bo.BackOff(ctx, nil)
})
health.SetInPollNetMap(false) health.SetInPollNetMap(false)
c.mu.Lock() c.mu.Lock()
@ -561,16 +574,14 @@ func (c *Auto) mapRoutine() {
c.mu.Unlock() c.mu.Unlock()
if paused { if paused {
mrs.bo.BackOff(ctx, nil)
c.logf("mapRoutine: paused") c.logf("mapRoutine: paused")
continue continue
} }
if err != nil { report(err, "PollNetMap")
report(err, "PollNetMap") mrs.bo.BackOff(ctx, err)
bo.BackOff(ctx, err) continue
continue
}
bo.BackOff(ctx, nil)
} }
} }
} }

View File

@ -180,6 +180,16 @@ type Decompressor interface {
Close() Close()
} }
// NetmapUpdater is the interface needed by the controlclient to enact change in
// the world as a function of updates received from the network.
type NetmapUpdater interface {
UpdateFullNetmap(*netmap.NetworkMap)
// TODO(bradfitz): add methods to do fine-grained updates, mutating just
// parts of peers, without implementations of NetmapUpdater needing to do
// the diff themselves between the previous full & next full network maps.
}
// NewDirect returns a new Direct client. // NewDirect returns a new Direct client.
func NewDirect(opts Options) (*Direct, error) { func NewDirect(opts Options) (*Direct, error) {
if opts.ServerURL == "" { if opts.ServerURL == "" {
@ -767,24 +777,31 @@ func (c *Direct) SetEndpoints(endpoints []tailcfg.Endpoint) (changed bool) {
return c.newEndpoints(endpoints) return c.newEndpoints(endpoints)
} }
// PollNetMap makes a /map request to download the network map, calling cb with // PollNetMap makes a /map request to download the network map, calling
// each new netmap. // NetmapUpdater on each update from the control plane.
// It always returns a non-nil error describing the reason for the failure //
// or why the request ended. // It always returns a non-nil error describing the reason for the failure or
func (c *Direct) PollNetMap(ctx context.Context, cb func(*netmap.NetworkMap)) error { // why the request ended.
return c.sendMapRequest(ctx, true, cb) func (c *Direct) PollNetMap(ctx context.Context, nu NetmapUpdater) error {
return c.sendMapRequest(ctx, true, nu)
}
type rememberLastNetmapUpdater struct {
last *netmap.NetworkMap
}
func (nu *rememberLastNetmapUpdater) UpdateFullNetmap(nm *netmap.NetworkMap) {
nu.last = nm
} }
// FetchNetMapForTest fetches the netmap once. // FetchNetMapForTest fetches the netmap once.
func (c *Direct) FetchNetMapForTest(ctx context.Context) (*netmap.NetworkMap, error) { func (c *Direct) FetchNetMapForTest(ctx context.Context) (*netmap.NetworkMap, error) {
var ret *netmap.NetworkMap var nu rememberLastNetmapUpdater
err := c.sendMapRequest(ctx, false, func(nm *netmap.NetworkMap) { err := c.sendMapRequest(ctx, false, &nu)
ret = nm if err == nil && nu.last == nil {
})
if err == nil && ret == nil {
return nil, errors.New("[unexpected] sendMapRequest success without callback") return nil, errors.New("[unexpected] sendMapRequest success without callback")
} }
return ret, err return nu.last, err
} }
// SendUpdate makes a /map request to update the server of our latest state, but // SendUpdate makes a /map request to update the server of our latest state, but
@ -805,8 +822,8 @@ func (c *Direct) SendUpdate(ctx context.Context) error {
// and as such always returns a non-nil error. // and as such always returns a non-nil error.
// //
// If cb is nil, OmitPeers will be set to true. // If cb is nil, OmitPeers will be set to true.
func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, cb func(*netmap.NetworkMap)) error { func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu NetmapUpdater) error {
if isStreaming && cb == nil { if isStreaming && nu == nil {
panic("cb must be non-nil if isStreaming is true") panic("cb must be non-nil if isStreaming is true")
} }
@ -868,7 +885,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, cb func(*
Stream: isStreaming, Stream: isStreaming,
Hostinfo: hi, Hostinfo: hi,
DebugFlags: c.debugFlags, DebugFlags: c.debugFlags,
OmitPeers: cb == nil, OmitPeers: nu == nil,
TKAHead: c.tkaHead, TKAHead: c.tkaHead,
} }
var extraDebugFlags []string var extraDebugFlags []string
@ -939,7 +956,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, cb func(*
health.NoteMapRequestHeard(request) health.NoteMapRequestHeard(request)
if cb == nil { if nu == nil {
io.Copy(io.Discard, res.Body) io.Copy(io.Discard, res.Body)
return nil return nil
} }
@ -1135,7 +1152,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, cb func(*
c.expiry = &nm.Expiry c.expiry = &nm.Expiry
c.mu.Unlock() c.mu.Unlock()
cb(nm) nu.UpdateFullNetmap(nm)
} }
if ctx.Err() != nil { if ctx.Err() != nil {
return ctx.Err() return ctx.Err()