mirror of
https://github.com/tailscale/tailscale.git
synced 2025-02-20 11:58:39 +00:00
ipn/ipnlocal: do not process old status messages received out of order
When `setWgengineStatus` is invoked concurrently from multiple goroutines, it is possible that the call invoked with a newer status is processed before a call with an older status. e.g. a status that has endpoints might be followed by a status without endpoints. This causes unnecessary work in the engine and can result in packet loss. This patch adds an `AsOf time.Time` field to the status to specifiy when the status was calculated, which later allows `setWgengineStatus` to ignore any status messages it receives that are older than the one it has already processed. Updates tailscale/corp#2579 Signed-off-by: Maisem Ali <maisem@tailscale.com>
This commit is contained in:
parent
753f1bfad4
commit
6fecc16c3b
@ -140,6 +140,7 @@ type LocalBackend struct {
|
|||||||
peerAPIListeners []*peerAPIListener
|
peerAPIListeners []*peerAPIListener
|
||||||
loginFlags controlclient.LoginFlags
|
loginFlags controlclient.LoginFlags
|
||||||
incomingFiles map[*incomingFile]bool
|
incomingFiles map[*incomingFile]bool
|
||||||
|
lastStatusTime time.Time // status.AsOf value of the last processed status update
|
||||||
// directFileRoot, if non-empty, means to write received files
|
// directFileRoot, if non-empty, means to write received files
|
||||||
// directly to this directory, without staging them in an
|
// directly to this directory, without staging them in an
|
||||||
// intermediate buffered directory for "pick-up" later. If
|
// intermediate buffered directory for "pick-up" later. If
|
||||||
@ -706,6 +707,13 @@ func (b *LocalBackend) setWgengineStatus(s *wgengine.Status, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
if s.AsOf.Before(b.lastStatusTime) {
|
||||||
|
// Don't process a status update that is older than the one we have
|
||||||
|
// already processed. (corp#2579)
|
||||||
|
b.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.lastStatusTime = s.AsOf
|
||||||
es := b.parseWgStatusLocked(s)
|
es := b.parseWgStatusLocked(s)
|
||||||
cc := b.cc
|
cc := b.cc
|
||||||
b.engineStatus = es
|
b.engineStatus = es
|
||||||
|
@ -62,6 +62,7 @@ func (nt *notifyThrottler) put(n ipn.Notify) {
|
|||||||
// drain pulls the notifications out of the queue, asserting that there are
|
// drain pulls the notifications out of the queue, asserting that there are
|
||||||
// exactly count notifications that have been put so far.
|
// exactly count notifications that have been put so far.
|
||||||
func (nt *notifyThrottler) drain(count int) []ipn.Notify {
|
func (nt *notifyThrottler) drain(count int) []ipn.Notify {
|
||||||
|
nt.t.Helper()
|
||||||
nt.mu.Lock()
|
nt.mu.Lock()
|
||||||
ch := nt.ch
|
ch := nt.ch
|
||||||
nt.mu.Unlock()
|
nt.mu.Unlock()
|
||||||
@ -923,7 +924,7 @@ func TestStateMachine(t *testing.T) {
|
|||||||
}
|
}
|
||||||
notifies.expect(1)
|
notifies.expect(1)
|
||||||
// Fake a DERP connection.
|
// Fake a DERP connection.
|
||||||
b.setWgengineStatus(&wgengine.Status{DERPs: 1}, nil)
|
b.setWgengineStatus(&wgengine.Status{DERPs: 1, AsOf: time.Now()}, nil)
|
||||||
{
|
{
|
||||||
nn := notifies.drain(1)
|
nn := notifies.drain(1)
|
||||||
cc.assertCalls("unpause")
|
cc.assertCalls("unpause")
|
||||||
@ -1016,7 +1017,7 @@ func TestWGEngineStatusRace(t *testing.T) {
|
|||||||
if i == 0 {
|
if i == 0 {
|
||||||
n = 1
|
n = 1
|
||||||
}
|
}
|
||||||
b.setWgengineStatus(&wgengine.Status{DERPs: n}, nil)
|
b.setWgengineStatus(&wgengine.Status{AsOf: time.Now(), DERPs: n}, nil)
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
@ -1113,6 +1113,7 @@ func (e *userspaceEngine) getStatus() (*Status, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &Status{
|
return &Status{
|
||||||
|
AsOf: time.Now(),
|
||||||
LocalAddrs: append([]tailcfg.Endpoint(nil), e.endpoints...),
|
LocalAddrs: append([]tailcfg.Endpoint(nil), e.endpoints...),
|
||||||
Peers: peers,
|
Peers: peers,
|
||||||
DERPs: derpConns,
|
DERPs: derpConns,
|
||||||
|
@ -6,6 +6,7 @@ package wgengine
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
"inet.af/netaddr"
|
"inet.af/netaddr"
|
||||||
"tailscale.com/ipn/ipnstate"
|
"tailscale.com/ipn/ipnstate"
|
||||||
@ -23,6 +24,7 @@ import (
|
|||||||
//
|
//
|
||||||
// TODO(bradfitz): remove this, subset of ipnstate? Need to migrate users.
|
// TODO(bradfitz): remove this, subset of ipnstate? Need to migrate users.
|
||||||
type Status struct {
|
type Status struct {
|
||||||
|
AsOf time.Time // the time at which the status was calculated
|
||||||
Peers []ipnstate.PeerStatusLite
|
Peers []ipnstate.PeerStatusLite
|
||||||
LocalAddrs []tailcfg.Endpoint // the set of possible endpoints for the magic conn
|
LocalAddrs []tailcfg.Endpoint // the set of possible endpoints for the magic conn
|
||||||
DERPs int // number of active DERP connections
|
DERPs int // number of active DERP connections
|
||||||
|
Loading…
x
Reference in New Issue
Block a user