mirror of
https://github.com/tailscale/tailscale.git
synced 2025-03-28 12:02:23 +00:00
wgengine/magicsock: fix data race regression in disco ping callbacks
Regression from c15997511de3. The callback could be run multiple times from different endpoints. Fixes #9801 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
parent
5297bd2cff
commit
a6270826a3
@ -105,9 +105,8 @@ type sentPing struct {
|
|||||||
at mono.Time
|
at mono.Time
|
||||||
timer *time.Timer // timeout timer
|
timer *time.Timer // timeout timer
|
||||||
purpose discoPingPurpose
|
purpose discoPingPurpose
|
||||||
size int // size of the disco message
|
size int // size of the disco message
|
||||||
res *ipnstate.PingResult // nil unless CLI ping
|
resCB *pingResultAndCallback // or nil for internal use
|
||||||
cb func(*ipnstate.PingResult) // nil unless CLI ping
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// endpointState is some state and history for a specific endpoint of
|
// endpointState is some state and history for a specific endpoint of
|
||||||
@ -431,7 +430,7 @@ func (de *endpoint) heartbeat() {
|
|||||||
udpAddr, _, _ := de.addrForSendLocked(now)
|
udpAddr, _, _ := de.addrForSendLocked(now)
|
||||||
if udpAddr.IsValid() {
|
if udpAddr.IsValid() {
|
||||||
// We have a preferred path. Ping that every 2 seconds.
|
// We have a preferred path. Ping that every 2 seconds.
|
||||||
de.startDiscoPingLocked(udpAddr, now, pingHeartbeat, 0, nil, nil)
|
de.startDiscoPingLocked(udpAddr, now, pingHeartbeat, 0, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if de.wantFullPingLocked(now) {
|
if de.wantFullPingLocked(now) {
|
||||||
@ -475,9 +474,20 @@ func (de *endpoint) noteActiveLocked() {
|
|||||||
// can send - the maximum packet size minus the IPv4 and UDP headers.
|
// can send - the maximum packet size minus the IPv4 and UDP headers.
|
||||||
var MaxDiscoPingSize = tstun.MaxPacketSize - 20 - 8
|
var MaxDiscoPingSize = tstun.MaxPacketSize - 20 - 8
|
||||||
|
|
||||||
// cliPing starts a ping for the "tailscale ping" command. res is value to call cb with,
|
type pingResultAndCallback struct {
|
||||||
// already partially filled.
|
taken atomic.Bool // first CompareAndSwamp from false to true takes ownership of res
|
||||||
func (de *endpoint) cliPing(res *ipnstate.PingResult, size int, cb func(*ipnstate.PingResult)) {
|
res *ipnstate.PingResult
|
||||||
|
cb func(*ipnstate.PingResult)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pingResultAndCallback) reply() bool {
|
||||||
|
return p != nil && p.taken.CompareAndSwap(false, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// discoPing starts a disc-level ping for the "tailscale ping" command (or other
|
||||||
|
// callers, such as c2n). res is value to call cb with, already partially
|
||||||
|
// filled. cb must be called at most once. Once called, ownership of res passes to db.
|
||||||
|
func (de *endpoint) discoPing(res *ipnstate.PingResult, size int, cb func(*ipnstate.PingResult)) {
|
||||||
de.mu.Lock()
|
de.mu.Lock()
|
||||||
defer de.mu.Unlock()
|
defer de.mu.Unlock()
|
||||||
|
|
||||||
@ -492,21 +502,23 @@ func (de *endpoint) cliPing(res *ipnstate.PingResult, size int, cb func(*ipnstat
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resCB := &pingResultAndCallback{res: res, cb: cb}
|
||||||
|
|
||||||
now := mono.Now()
|
now := mono.Now()
|
||||||
udpAddr, derpAddr := de.addrForPingSizeLocked(now, size)
|
udpAddr, derpAddr := de.addrForPingSizeLocked(now, size)
|
||||||
|
|
||||||
if derpAddr.IsValid() {
|
if derpAddr.IsValid() {
|
||||||
de.startDiscoPingLocked(derpAddr, now, pingCLI, size, res, cb)
|
de.startDiscoPingLocked(derpAddr, now, pingCLI, size, resCB)
|
||||||
}
|
}
|
||||||
if udpAddr.IsValid() && now.Before(de.trustBestAddrUntil) {
|
if udpAddr.IsValid() && now.Before(de.trustBestAddrUntil) {
|
||||||
// Already have an active session, so just ping the address we're using.
|
// Already have an active session, so just ping the address we're using.
|
||||||
// Otherwise "tailscale ping" results to a node on the local network
|
// Otherwise "tailscale ping" results to a node on the local network
|
||||||
// can look like they're bouncing between, say 10.0.0.0/9 and the peer's
|
// can look like they're bouncing between, say 10.0.0.0/9 and the peer's
|
||||||
// IPv6 address, both 1ms away, and it's random who replies first.
|
// IPv6 address, both 1ms away, and it's random who replies first.
|
||||||
de.startDiscoPingLocked(udpAddr, now, pingCLI, size, res, cb)
|
de.startDiscoPingLocked(udpAddr, now, pingCLI, size, resCB)
|
||||||
} else {
|
} else {
|
||||||
for ep := range de.endpointState {
|
for ep := range de.endpointState {
|
||||||
de.startDiscoPingLocked(ep, now, pingCLI, size, res, cb)
|
de.startDiscoPingLocked(ep, now, pingCLI, size, resCB)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
de.noteActiveLocked()
|
de.noteActiveLocked()
|
||||||
@ -660,10 +672,11 @@ const (
|
|||||||
pingCLI
|
pingCLI
|
||||||
)
|
)
|
||||||
|
|
||||||
// startDiscoPingLocked sends a disco ping to ep in a separate
|
// startDiscoPingLocked sends a disco ping to ep in a separate goroutine. resCB,
|
||||||
// goroutine. res and cb are for returning the results of CLI pings,
|
// if non-nil, means that a caller external to the magicsock package internals
|
||||||
// otherwise they are nil.
|
// is interested in the result (such as a CLI "tailscale ping" or a c2n ping
|
||||||
func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpose discoPingPurpose, size int, res *ipnstate.PingResult, cb func(*ipnstate.PingResult)) {
|
// request, etc)
|
||||||
|
func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpose discoPingPurpose, size int, resCB *pingResultAndCallback) {
|
||||||
if runtime.GOOS == "js" {
|
if runtime.GOOS == "js" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -710,8 +723,7 @@ func (de *endpoint) startDiscoPingLocked(ep netip.AddrPort, now mono.Time, purpo
|
|||||||
at: now,
|
at: now,
|
||||||
timer: time.AfterFunc(pingTimeoutDuration, func() { de.discoPingTimeout(txid) }),
|
timer: time.AfterFunc(pingTimeoutDuration, func() { de.discoPingTimeout(txid) }),
|
||||||
purpose: purpose,
|
purpose: purpose,
|
||||||
res: res,
|
resCB: resCB,
|
||||||
cb: cb,
|
|
||||||
size: s,
|
size: s,
|
||||||
}
|
}
|
||||||
go de.sendDiscoPing(ep, epDisco.key, txid, s, logLevel)
|
go de.sendDiscoPing(ep, epDisco.key, txid, s, logLevel)
|
||||||
@ -742,7 +754,7 @@ func (de *endpoint) sendDiscoPingsLocked(now mono.Time, sendCallMeMaybe bool) {
|
|||||||
de.c.dlogf("[v1] magicsock: disco: send, starting discovery for %v (%v)", de.publicKey.ShortString(), de.discoShort())
|
de.c.dlogf("[v1] magicsock: disco: send, starting discovery for %v (%v)", de.publicKey.ShortString(), de.discoShort())
|
||||||
}
|
}
|
||||||
|
|
||||||
de.startDiscoPingLocked(ep, now, pingDiscovery, 0, nil, nil)
|
de.startDiscoPingLocked(ep, now, pingDiscovery, 0, nil)
|
||||||
}
|
}
|
||||||
derpAddr := de.derpAddr
|
derpAddr := de.derpAddr
|
||||||
if sentAny && sendCallMeMaybe && derpAddr.IsValid() {
|
if sentAny && sendCallMeMaybe && derpAddr.IsValid() {
|
||||||
@ -1106,11 +1118,11 @@ func (de *endpoint) handlePongConnLocked(m *disco.Pong, di *discoInfo, src netip
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Currently only CLI ping uses this callback.
|
// Currently only CLI ping uses this callback.
|
||||||
if sp.cb != nil {
|
if sp.resCB.reply() {
|
||||||
if sp.purpose == pingCLI {
|
if sp.purpose == pingCLI {
|
||||||
de.c.populateCLIPingResponseLocked(sp.res, latency, sp.to)
|
de.c.populateCLIPingResponseLocked(sp.resCB.res, latency, sp.to)
|
||||||
}
|
}
|
||||||
go sp.cb(sp.res)
|
go sp.resCB.cb(sp.resCB.res)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Promote this pong response to our current best address if it's lower latency.
|
// Promote this pong response to our current best address if it's lower latency.
|
||||||
|
@ -757,7 +757,7 @@ func (c *Conn) Ping(peer tailcfg.NodeView, res *ipnstate.PingResult, size int, c
|
|||||||
cb(res)
|
cb(res)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ep.cliPing(res, size, cb)
|
ep.discoPing(res, size, cb)
|
||||||
}
|
}
|
||||||
|
|
||||||
// c.mu must be held
|
// c.mu must be held
|
||||||
|
Loading…
x
Reference in New Issue
Block a user