diff --git a/control/controlclient/direct.go b/control/controlclient/direct.go index 31481d50c..c3db722f5 100644 --- a/control/controlclient/direct.go +++ b/control/controlclient/direct.go @@ -828,7 +828,7 @@ func (c *Direct) SendUpdate(ctx context.Context) error { // if the context expires or the server returns an error/closes the connection // and as such always returns a non-nil error. // -// If cb is nil, OmitPeers will be set to true. +// If nu is nil, OmitPeers will be set to true. func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu NetmapUpdater) error { if isStreaming && nu == nil { panic("cb must be non-nil if isStreaming is true") @@ -992,7 +992,27 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap } c.expiry = nm.Expiry } - sess.StartWatchdog() + + // Create a watchdog timer that breaks the connection if we don't receive a + // MapResponse from the network at least once every two minutes. The + // watchdog timer is stopped every time we receive a MapResponse (so it + // doesn't run when we're processing a MapResponse message, including any + // long-running requested operations like Debug.Sleep) and is reset whenever + // we go back to blocking on network reads. + watchdogTimer, watchdogTimedOut := c.clock.NewTimer(watchdogTimeout) + defer watchdogTimer.Stop() + + go func() { + select { + case <-ctx.Done(): + vlogf("netmap: ending timeout goroutine") + return + case <-watchdogTimedOut: + c.logf("map response long-poll timed out!") + cancel() + return + } + }() // gotNonKeepAliveMessage is whether we've yet received a MapResponse message without // KeepAlive set. @@ -1006,6 +1026,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap // We can use this same read loop either way. var msg []byte for mapResIdx := 0; mapResIdx == 0 || isStreaming; mapResIdx++ { + watchdogTimer.Reset(watchdogTimeout) vlogf("netmap: starting size read after %v (poll %v)", time.Since(t0).Round(time.Millisecond), mapResIdx) var siz [4]byte if _, err := io.ReadFull(res.Body, siz[:]); err != nil { @@ -1026,6 +1047,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap vlogf("netmap: decode error: %v") return err } + watchdogTimer.Stop() metricMapResponseMessages.Add(1) @@ -1068,14 +1090,6 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap c.logf("netmap: [unexpected] new dial plan; nowhere to store it") } } - - select { - case sess.watchdogReset <- struct{}{}: - vlogf("netmap: sent timer reset") - case <-ctx.Done(): - c.logf("[v1] netmap: not resetting timer; context done: %v", ctx.Err()) - return ctx.Err() - } if resp.KeepAlive { metricMapResponseKeepAlives.Add(1) continue @@ -1102,7 +1116,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap return nil } -func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug, watchdogReset chan<- struct{}) error { +func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug) error { if code := debug.Exit; code != nil { c.logf("exiting process with status %v per controlplane", *code) os.Exit(*code) @@ -1112,7 +1126,7 @@ func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug, w envknob.SetNoLogsNoSupport() } if sleep := time.Duration(debug.SleepSeconds * float64(time.Second)); sleep > 0 { - if err := sleepAsRequested(ctx, c.logf, watchdogReset, sleep, c.clock); err != nil { + if err := sleepAsRequested(ctx, c.logf, sleep, c.clock); err != nil { return err } } @@ -1444,7 +1458,7 @@ func answerC2NPing(logf logger.Logf, c2nHandler http.Handler, c *http.Client, pr // that the client sleep. The complication is that while we're sleeping (if for // a long time), we need to periodically reset the watchdog timer before it // expires. -func sleepAsRequested(ctx context.Context, logf logger.Logf, watchdogReset chan<- struct{}, d time.Duration, clock tstime.Clock) error { +func sleepAsRequested(ctx context.Context, logf logger.Logf, d time.Duration, clock tstime.Clock) error { const maxSleep = 5 * time.Minute if d > maxSleep { logf("sleeping for %v, capped from server-requested %v ...", maxSleep, d) @@ -1453,25 +1467,13 @@ func sleepAsRequested(ctx context.Context, logf logger.Logf, watchdogReset chan< logf("sleeping for server-requested %v ...", d) } - ticker, tickerChannel := clock.NewTicker(watchdogTimeout / 2) - defer ticker.Stop() timer, timerChannel := clock.NewTimer(d) defer timer.Stop() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-timerChannel: - return nil - case <-tickerChannel: - select { - case watchdogReset <- struct{}{}: - case <-timerChannel: - return nil - case <-ctx.Done(): - return ctx.Err() - } - } + select { + case <-ctx.Done(): + return ctx.Err() + case <-timerChannel: + return nil } } diff --git a/control/controlclient/map.go b/control/controlclient/map.go index 6c0e006a5..8f0d4cac2 100644 --- a/control/controlclient/map.go +++ b/control/controlclient/map.go @@ -49,7 +49,6 @@ type mapSession struct { machinePubKey key.MachinePublic altClock tstime.Clock // if nil, regular time is used cancel context.CancelFunc // always non-nil, shuts down caller's base long poll context - watchdogReset chan struct{} // send to request that the long poll activity watchdog timeout be reset // sessionAliveCtx is a Background-based context that's alive for the // duration of the mapSession that we own the lifetime of. It's closed by @@ -57,12 +56,12 @@ type mapSession struct { sessionAliveCtx context.Context sessionAliveCtxClose context.CancelFunc // closes sessionAliveCtx - // Optional hooks, set once before use. + // Optional hooks, guaranteed non-nil (set to no-op funcs) by the + // newMapSession constructor. They must be overridden if desired + // before the mapSession is used. // onDebug specifies what to do with a *tailcfg.Debug message. - // If the watchdogReset chan is nil, it's not used. Otherwise it can be sent to - // to request that the long poll activity watchdog timeout be reset. - onDebug func(_ context.Context, _ *tailcfg.Debug, watchdogReset chan<- struct{}) error + onDebug func(context.Context, *tailcfg.Debug) error // onSelfNodeChanged is called before the NetmapUpdater if the self node was // changed. @@ -102,13 +101,12 @@ func newMapSession(privateNodeKey key.NodePrivate, nu NetmapUpdater, controlKnob publicNodeKey: privateNodeKey.Public(), lastDNSConfig: new(tailcfg.DNSConfig), lastUserProfile: map[tailcfg.UserID]tailcfg.UserProfile{}, - watchdogReset: make(chan struct{}), // Non-nil no-op defaults, to be optionally overridden by the caller. logf: logger.Discard, vlogf: logger.Discard, cancel: func() {}, - onDebug: func(context.Context, *tailcfg.Debug, chan<- struct{}) error { return nil }, + onDebug: func(context.Context, *tailcfg.Debug) error { return nil }, onSelfNodeChanged: func(*netmap.NetworkMap) {}, } ms.sessionAliveCtx, ms.sessionAliveCtxClose = context.WithCancel(context.Background()) @@ -133,38 +131,6 @@ func (ms *mapSession) clock() tstime.Clock { return cmpx.Or[tstime.Clock](ms.altClock, tstime.StdClock{}) } -// StartWatchdog starts the session's watchdog timer. -// If there's no activity in too long, it tears down the connection. -// Call Close to release these resources. -func (ms *mapSession) StartWatchdog() { - timer, timedOutChan := ms.clock().NewTimer(watchdogTimeout) - go func() { - defer timer.Stop() - for { - select { - case <-ms.sessionAliveCtx.Done(): - ms.vlogf("netmap: ending timeout goroutine") - return - case <-timedOutChan: - ms.logf("map response long-poll timed out!") - ms.cancel() - return - case <-ms.watchdogReset: - if !timer.Stop() { - select { - case <-timedOutChan: - case <-ms.sessionAliveCtx.Done(): - ms.vlogf("netmap: ending timeout goroutine") - return - } - } - ms.vlogf("netmap: reset timeout timer") - timer.Reset(watchdogTimeout) - } - } - }() -} - func (ms *mapSession) Close() { ms.sessionAliveCtxClose() } @@ -179,7 +145,7 @@ func (ms *mapSession) Close() { // is [re]factoring progress enough. func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *tailcfg.MapResponse) error { if debug := resp.Debug; debug != nil { - if err := ms.onDebug(ctx, debug, ms.watchdogReset); err != nil { + if err := ms.onDebug(ctx, debug); err != nil { return err } }