mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-29 04:55:31 +00:00
derp: accept dup clients without closing prior's connection
A public key should only have max one connection to a given
DERP node (or really: one connection to a node in a region).
But if people clone their machine keys (e.g. clone their VM, Raspbery
Pi SD card, etc), then we can get into a situation where a public key
is connected multiple times.
Originally, the DERP server handled this by just kicking out a prior
connections whenever a new one came. But this led to reconnect fights
where 2+ nodes were in hard loops trying to reconnect and kicking out
their peer.
Then a909d37a59
tried to add rate
limiting to how often that dup-kicking can happen, but empirically it
just doesn't work and ~leaks a bunch of goroutines and TCP
connections, tying them up for hour+ while more and more accumulate
and waste memory. Mostly because we were doing a time.Sleep forever
while not reading from their TCP connections.
Instead, just accept multiple connections per public key but track
which is the most recent. And if two both are writing back & forth,
then optionally disable them both. That last part is only enabled in
tests for now. The current default policy is just last-sender-wins
while we gather the next round of stats.
Updates #2751
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
parent
debaaebf3b
commit
73280595a8
@ -41,6 +41,7 @@
|
||||
"tailscale.com/client/tailscale"
|
||||
"tailscale.com/disco"
|
||||
"tailscale.com/metrics"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/types/key"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/types/pad32"
|
||||
@ -77,6 +78,21 @@ func init() {
|
||||
writeTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
// dupPolicy is a temporary (2021-08-30) mechanism to change the policy
|
||||
// of how duplicate connection for the same key are handled.
|
||||
type dupPolicy int8
|
||||
|
||||
const (
|
||||
// lastWriterIsActive is a dupPolicy where the connection
|
||||
// to send traffic for a peer is the active one.
|
||||
lastWriterIsActive dupPolicy = iota
|
||||
|
||||
// disableFighters is a dupPolicy that detects if peers
|
||||
// are trying to send interleaved with each other and
|
||||
// then disables all of them.
|
||||
disableFighters
|
||||
)
|
||||
|
||||
// Server is a DERP server.
|
||||
type Server struct {
|
||||
// WriteTimeout, if non-zero, specifies how long to wait
|
||||
@ -90,9 +106,9 @@ type Server struct {
|
||||
meshKey string
|
||||
limitedLogf logger.Logf
|
||||
metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
|
||||
dupPolicy dupPolicy
|
||||
|
||||
// Counters:
|
||||
_ pad32.Four
|
||||
packetsSent, bytesSent expvar.Int
|
||||
packetsRecv, bytesRecv expvar.Int
|
||||
packetsRecvByKind metrics.LabelMap
|
||||
@ -112,9 +128,9 @@ type Server struct {
|
||||
accepts expvar.Int
|
||||
curClients expvar.Int
|
||||
curHomeClients expvar.Int // ones with preferred
|
||||
clientsReplaced expvar.Int
|
||||
clientsReplaceLimited expvar.Int
|
||||
clientsReplaceSleeping expvar.Int
|
||||
dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
|
||||
dupClientConns expvar.Int // current number of connections sharing a public key
|
||||
dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
|
||||
unknownFrames expvar.Int
|
||||
homeMovesIn expvar.Int // established clients announce home server moves in
|
||||
homeMovesOut expvar.Int // established clients announce home server moves out
|
||||
@ -130,7 +146,7 @@ type Server struct {
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
netConns map[Conn]chan struct{} // chan is closed when conn closes
|
||||
clients map[key.Public]*sclient
|
||||
clients map[key.Public]clientSet
|
||||
watchers map[*sclient]bool // mesh peer -> true
|
||||
// clientsMesh tracks all clients in the cluster, both locally
|
||||
// and to mesh peers. If the value is nil, that means the
|
||||
@ -148,6 +164,112 @@ type Server struct {
|
||||
keyOfAddr map[netaddr.IPPort]key.Public
|
||||
}
|
||||
|
||||
// clientSet represents 1 or more *sclients.
|
||||
//
|
||||
// The two implementations are singleClient and *dupClientSet.
|
||||
//
|
||||
// In the common case, client should only have one connection to the
|
||||
// DERP server for a given key. When they're connected multiple times,
|
||||
// we record their set of connections in dupClientSet and keep their
|
||||
// connections open to make them happy (to keep them from spinning,
|
||||
// etc) and keep track of which is the latest connection. If only the last
|
||||
// is sending traffic, that last one is the active connection and it
|
||||
// gets traffic. Otherwise, in the case of a cloned node key, the
|
||||
// whole set of dups doesn't receive data frames.
|
||||
//
|
||||
// All methods should only be called while holding Server.mu.
|
||||
//
|
||||
// TODO(bradfitz): Issue 2746: in the future we'll send some sort of
|
||||
// "health_error" frame to them that'll communicate to the end users
|
||||
// that they cloned a device key, and we'll also surface it in the
|
||||
// admin panel, etc.
|
||||
type clientSet interface {
|
||||
// ActiveClient returns the most recently added client to
|
||||
// the set, as long as it hasn't been disabled, in which
|
||||
// case it returns nil.
|
||||
ActiveClient() *sclient
|
||||
|
||||
// Len returns the number of clients in the set.
|
||||
Len() int
|
||||
|
||||
// ForeachClient calls f for each client in the set.
|
||||
ForeachClient(f func(*sclient))
|
||||
}
|
||||
|
||||
// singleClient is a clientSet of a single connection.
|
||||
// This is the common case.
|
||||
type singleClient struct{ c *sclient }
|
||||
|
||||
func (s singleClient) ActiveClient() *sclient { return s.c }
|
||||
func (s singleClient) Len() int { return 1 }
|
||||
func (s singleClient) ForeachClient(f func(*sclient)) { f(s.c) }
|
||||
|
||||
// A dupClientSet is a clientSet of more than 1 connection.
|
||||
//
|
||||
// This can occur in some reasonable cases (temporarily while users
|
||||
// are changing networks) or in the case of a cloned key. In the
|
||||
// cloned key case, both peers are speaking and the clients get
|
||||
// disabled.
|
||||
//
|
||||
// All fields are guarded by Server.mu.
|
||||
type dupClientSet struct {
|
||||
// set is the set of connected clients for sclient.key.
|
||||
// The values are all true.
|
||||
set map[*sclient]bool
|
||||
|
||||
// last is the most recent addition to set, or nil if the most
|
||||
// recent one has since disconnected and nobody else has send
|
||||
// data since.
|
||||
last *sclient
|
||||
|
||||
// sendHistory is a log of which members of set have sent
|
||||
// frames to the derp server, with adjacent duplicates
|
||||
// removed. When a member of set is removed, the same
|
||||
// element(s) are removed from sendHistory.
|
||||
sendHistory []*sclient
|
||||
}
|
||||
|
||||
func (s *dupClientSet) ActiveClient() *sclient {
|
||||
if s.last != nil && !s.last.isDisabled.Get() {
|
||||
return s.last
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (s *dupClientSet) Len() int { return len(s.set) }
|
||||
func (s *dupClientSet) ForeachClient(f func(*sclient)) {
|
||||
for c := range s.set {
|
||||
f(c)
|
||||
}
|
||||
}
|
||||
|
||||
// removeClient removes c from s and reports whether it was in s
|
||||
// to begin with.
|
||||
func (s *dupClientSet) removeClient(c *sclient) bool {
|
||||
n := len(s.set)
|
||||
delete(s.set, c)
|
||||
if s.last == c {
|
||||
s.last = nil
|
||||
}
|
||||
if len(s.set) == n {
|
||||
return false
|
||||
}
|
||||
|
||||
trim := s.sendHistory[:0]
|
||||
for _, v := range s.sendHistory {
|
||||
if s.set[v] && (len(trim) == 0 || trim[len(trim)-1] != v) {
|
||||
trim = append(trim, v)
|
||||
}
|
||||
}
|
||||
for i := len(trim); i < len(s.sendHistory); i++ {
|
||||
s.sendHistory[i] = nil
|
||||
}
|
||||
s.sendHistory = trim
|
||||
if s.last == nil && len(s.sendHistory) > 0 {
|
||||
s.last = s.sendHistory[len(s.sendHistory)-1]
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// PacketForwarder is something that can forward packets.
|
||||
//
|
||||
// It's mostly an interface for circular dependency reasons; the
|
||||
@ -184,7 +306,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
|
||||
packetsRecvByKind: metrics.LabelMap{Label: "kind"},
|
||||
packetsDroppedReason: metrics.LabelMap{Label: "reason"},
|
||||
packetsDroppedType: metrics.LabelMap{Label: "type"},
|
||||
clients: map[key.Public]*sclient{},
|
||||
clients: map[key.Public]clientSet{},
|
||||
clientsMesh: map[key.Public]PacketForwarder{},
|
||||
netConns: map[Conn]chan struct{}{},
|
||||
memSys0: ms.Sys,
|
||||
@ -344,39 +466,48 @@ func (s *Server) MetaCert() []byte { return s.metaCert }
|
||||
|
||||
// registerClient notes that client c is now authenticated and ready for packets.
|
||||
//
|
||||
// If c's public key was already connected with a different
|
||||
// connection, the prior one is closed, unless it's fighting rapidly
|
||||
// with another client with the same key, in which case the returned
|
||||
// ok is false, and the caller should wait the provided duration
|
||||
// before trying again.
|
||||
func (s *Server) registerClient(c *sclient) (ok bool, d time.Duration) {
|
||||
// If c.key is connected more than once, the earlier connection(s) are
|
||||
// placed in a non-active state where we read from them (primarily to
|
||||
// observe EOFs/timeouts) but won't send them frames on the assumption
|
||||
// that they're dead.
|
||||
func (s *Server) registerClient(c *sclient) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
old := s.clients[c.key]
|
||||
if old == nil {
|
||||
c.logf("adding connection")
|
||||
} else {
|
||||
// Take over the old rate limiter, discarding the one
|
||||
// our caller just made.
|
||||
c.replaceLimiter = old.replaceLimiter
|
||||
if rr := c.replaceLimiter.ReserveN(timeNow(), 1); rr.OK() {
|
||||
if d := rr.DelayFrom(timeNow()); d > 0 {
|
||||
s.clientsReplaceLimited.Add(1)
|
||||
return false, d
|
||||
}
|
||||
|
||||
set := s.clients[c.key]
|
||||
switch set := set.(type) {
|
||||
case nil:
|
||||
s.clients[c.key] = singleClient{c}
|
||||
case singleClient:
|
||||
s.dupClientKeys.Add(1)
|
||||
s.dupClientConns.Add(2) // both old and new count
|
||||
s.dupClientConnTotal.Add(1)
|
||||
old := set.ActiveClient()
|
||||
old.isDup.Set(true)
|
||||
c.isDup.Set(true)
|
||||
s.clients[c.key] = &dupClientSet{
|
||||
last: c,
|
||||
set: map[*sclient]bool{
|
||||
old: true,
|
||||
c: true,
|
||||
},
|
||||
sendHistory: []*sclient{old},
|
||||
}
|
||||
s.clientsReplaced.Add(1)
|
||||
c.logf("adding connection, replacing %s", old.remoteAddr)
|
||||
go old.nc.Close()
|
||||
case *dupClientSet:
|
||||
s.dupClientConns.Add(1) // the gauge
|
||||
s.dupClientConnTotal.Add(1) // the counter
|
||||
c.isDup.Set(true)
|
||||
set.set[c] = true
|
||||
set.last = c
|
||||
set.sendHistory = append(set.sendHistory, c)
|
||||
}
|
||||
s.clients[c.key] = c
|
||||
|
||||
if _, ok := s.clientsMesh[c.key]; !ok {
|
||||
s.clientsMesh[c.key] = nil // just for varz of total users in cluster
|
||||
}
|
||||
s.keyOfAddr[c.remoteIPPort] = c.key
|
||||
s.curClients.Add(1)
|
||||
s.broadcastPeerStateChangeLocked(c.key, true)
|
||||
return true, 0
|
||||
}
|
||||
|
||||
// broadcastPeerStateChangeLocked enqueues a message to all watchers
|
||||
@ -395,8 +526,12 @@ func (s *Server) broadcastPeerStateChangeLocked(peer key.Public, present bool) {
|
||||
func (s *Server) unregisterClient(c *sclient) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
cur := s.clients[c.key]
|
||||
if cur == c {
|
||||
|
||||
set := s.clients[c.key]
|
||||
switch set := set.(type) {
|
||||
case nil:
|
||||
c.logf("[unexpected]; clients map is empty")
|
||||
case singleClient:
|
||||
c.logf("removing connection")
|
||||
delete(s.clients, c.key)
|
||||
if v, ok := s.clientsMesh[c.key]; ok && v == nil {
|
||||
@ -404,7 +539,28 @@ func (s *Server) unregisterClient(c *sclient) {
|
||||
s.notePeerGoneFromRegionLocked(c.key)
|
||||
}
|
||||
s.broadcastPeerStateChangeLocked(c.key, false)
|
||||
case *dupClientSet:
|
||||
if set.removeClient(c) {
|
||||
s.dupClientConns.Add(-1)
|
||||
} else {
|
||||
c.logf("[unexpected]; dup client set didn't shrink")
|
||||
}
|
||||
if set.Len() == 1 {
|
||||
s.dupClientConns.Add(-1) // again; for the original one's
|
||||
s.dupClientKeys.Add(-1)
|
||||
var remain *sclient
|
||||
for remain = range set.set {
|
||||
break
|
||||
}
|
||||
if remain == nil {
|
||||
panic("unexpected nil remain from single element dup set")
|
||||
}
|
||||
remain.isDisabled.Set(false)
|
||||
remain.isDup.Set(false)
|
||||
s.clients[c.key] = singleClient{remain}
|
||||
}
|
||||
}
|
||||
|
||||
if c.canMesh {
|
||||
delete(s.watchers, c)
|
||||
}
|
||||
@ -431,9 +587,15 @@ func (s *Server) notePeerGoneFromRegionLocked(key key.Public) {
|
||||
// or move them over to the active client (in case a replaced client
|
||||
// connection is being unregistered).
|
||||
for pubKey, connNum := range s.sentTo[key] {
|
||||
if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum {
|
||||
go peer.requestPeerGoneWrite(key)
|
||||
set, ok := s.clients[pubKey]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
set.ForeachClient(func(peer *sclient) {
|
||||
if peer.connNum == connNum {
|
||||
go peer.requestPeerGoneWrite(key)
|
||||
}
|
||||
})
|
||||
}
|
||||
delete(s.sentTo, key)
|
||||
}
|
||||
@ -503,12 +665,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
|
||||
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
|
||||
peerGone: make(chan key.Public),
|
||||
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
|
||||
|
||||
// Allow kicking out previous connections once a
|
||||
// minute, with a very high burst of 100. Once a
|
||||
// minute is less than the client's 2 minute
|
||||
// inactivity timeout.
|
||||
replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100),
|
||||
}
|
||||
|
||||
if c.canMesh {
|
||||
@ -518,15 +674,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
|
||||
c.info = *clientInfo
|
||||
}
|
||||
|
||||
for {
|
||||
ok, d := s.registerClient(c)
|
||||
if ok {
|
||||
break
|
||||
}
|
||||
s.clientsReplaceSleeping.Add(1)
|
||||
timeSleep(d)
|
||||
s.clientsReplaceSleeping.Add(-1)
|
||||
}
|
||||
s.registerClient(c)
|
||||
defer s.unregisterClient(c)
|
||||
|
||||
err = s.sendServerInfo(c.bw, clientKey)
|
||||
@ -570,6 +718,7 @@ func (c *sclient) run(ctx context.Context) error {
|
||||
}
|
||||
return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err)
|
||||
}
|
||||
c.s.noteClientActivity(c)
|
||||
switch ft {
|
||||
case frameNotePreferred:
|
||||
err = c.handleFrameNotePreferred(ft, fl)
|
||||
@ -634,9 +783,15 @@ func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if target, ok := s.clients[targetKey]; ok {
|
||||
c.logf("frameClosePeer closing peer %x", targetKey)
|
||||
go target.nc.Close()
|
||||
if set, ok := s.clients[targetKey]; ok {
|
||||
if set.Len() == 1 {
|
||||
c.logf("frameClosePeer closing peer %x", targetKey)
|
||||
} else {
|
||||
c.logf("frameClosePeer closing peer %x (%d connections)", targetKey, set.Len())
|
||||
}
|
||||
set.ForeachClient(func(target *sclient) {
|
||||
go target.nc.Close()
|
||||
})
|
||||
} else {
|
||||
c.logf("frameClosePeer failed to find peer %x", targetKey)
|
||||
}
|
||||
@ -658,15 +813,25 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
||||
}
|
||||
s.packetsForwardedIn.Add(1)
|
||||
|
||||
var dstLen int
|
||||
var dst *sclient
|
||||
|
||||
s.mu.Lock()
|
||||
dst := s.clients[dstKey]
|
||||
if set, ok := s.clients[dstKey]; ok {
|
||||
dstLen = set.Len()
|
||||
dst = set.ActiveClient()
|
||||
}
|
||||
if dst != nil {
|
||||
s.notePeerSendLocked(srcKey, dst)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
if dst == nil {
|
||||
s.recordDrop(contents, srcKey, dstKey, dropReasonUnknownDestOnFwd)
|
||||
reason := dropReasonUnknownDestOnFwd
|
||||
if dstLen > 1 {
|
||||
reason = dropReasonDupClient
|
||||
}
|
||||
s.recordDrop(contents, srcKey, dstKey, reason)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -699,12 +864,18 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
}
|
||||
|
||||
var fwd PacketForwarder
|
||||
var dstLen int
|
||||
var dst *sclient
|
||||
|
||||
s.mu.Lock()
|
||||
dst := s.clients[dstKey]
|
||||
if dst == nil {
|
||||
fwd = s.clientsMesh[dstKey]
|
||||
} else {
|
||||
if set, ok := s.clients[dstKey]; ok {
|
||||
dstLen = set.Len()
|
||||
dst = set.ActiveClient()
|
||||
}
|
||||
if dst != nil {
|
||||
s.notePeerSendLocked(c.key, dst)
|
||||
} else if dstLen < 1 {
|
||||
fwd = s.clientsMesh[dstKey]
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
@ -717,7 +888,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
s.recordDrop(contents, c.key, dstKey, dropReasonUnknownDest)
|
||||
reason := dropReasonUnknownDest
|
||||
if dstLen > 1 {
|
||||
reason = dropReasonDupClient
|
||||
}
|
||||
s.recordDrop(contents, c.key, dstKey, reason)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -741,6 +916,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
dropReasonQueueHead // destination queue is full, dropped packet at queue head
|
||||
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
|
||||
dropReasonWriteError // OS write() failed
|
||||
dropReasonDupClient // the public key is connected 2+ times (active/active, fighting)
|
||||
)
|
||||
|
||||
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.Public, reason dropReason) {
|
||||
@ -850,6 +1026,57 @@ func (s *Server) sendServerKey(lw *lazyBufioWriter) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) noteClientActivity(c *sclient) {
|
||||
if !c.isDup.Get() {
|
||||
// Fast path for clients that aren't in a dup set.
|
||||
return
|
||||
}
|
||||
if c.isDisabled.Get() {
|
||||
// If they're already disabled, no point checking more.
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
ds, ok := s.clients[c.key].(*dupClientSet)
|
||||
if !ok {
|
||||
// It became unduped in between the isDup fast path check above
|
||||
// and the mutex check. Nothing to do.
|
||||
return
|
||||
}
|
||||
|
||||
if s.dupPolicy == lastWriterIsActive {
|
||||
ds.last = c
|
||||
} else if ds.last == nil {
|
||||
// If we didn't have a primary, let the current
|
||||
// speaker be the primary.
|
||||
ds.last = c
|
||||
}
|
||||
|
||||
if sh := ds.sendHistory; len(sh) != 0 && sh[len(sh)-1] == c {
|
||||
// The client c was the last client to make activity
|
||||
// in this set and it was already recorded. Nothing to
|
||||
// do.
|
||||
return
|
||||
}
|
||||
|
||||
// If we saw this connection send previously, then consider
|
||||
// the group fighting and disable them all.
|
||||
if s.dupPolicy == disableFighters {
|
||||
for _, prior := range ds.sendHistory {
|
||||
if prior == c {
|
||||
ds.ForeachClient(func(c *sclient) {
|
||||
c.isDisabled.Set(true)
|
||||
})
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Append this client to the list of clients who spoke last.
|
||||
ds.sendHistory = append(ds.sendHistory, c)
|
||||
}
|
||||
|
||||
type serverInfo struct {
|
||||
Version int `json:"version,omitempty"`
|
||||
}
|
||||
@ -979,14 +1206,16 @@ type sclient struct {
|
||||
key key.Public
|
||||
info clientInfo
|
||||
logf logger.Logf
|
||||
done <-chan struct{} // closed when connection closes
|
||||
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
|
||||
remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port.
|
||||
sendQueue chan pkt // packets queued to this client; never closed
|
||||
discoSendQueue chan pkt // important packets queued to this client; never closed
|
||||
peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers)
|
||||
meshUpdate chan struct{} // write request to write peerStateChange
|
||||
canMesh bool // clientInfo had correct mesh token for inter-region routing
|
||||
done <-chan struct{} // closed when connection closes
|
||||
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
|
||||
remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port.
|
||||
sendQueue chan pkt // packets queued to this client; never closed
|
||||
discoSendQueue chan pkt // important packets queued to this client; never closed
|
||||
peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers)
|
||||
meshUpdate chan struct{} // write request to write peerStateChange
|
||||
canMesh bool // clientInfo had correct mesh token for inter-region routing
|
||||
isDup syncs.AtomicBool // whether more than 1 sclient for key is connected
|
||||
isDisabled syncs.AtomicBool // whether sends to this peer are disabled due to active/active dups
|
||||
|
||||
// replaceLimiter controls how quickly two connections with
|
||||
// the same client key can kick each other off the server by
|
||||
@ -1399,10 +1628,10 @@ func (s *Server) ExpVar() expvar.Var {
|
||||
m.Set("gauge_clients_total", expvar.Func(func() interface{} { return len(s.clientsMesh) }))
|
||||
m.Set("gauge_clients_local", expvar.Func(func() interface{} { return len(s.clients) }))
|
||||
m.Set("gauge_clients_remote", expvar.Func(func() interface{} { return len(s.clientsMesh) - len(s.clients) }))
|
||||
m.Set("gauge_current_dup_client_keys", &s.dupClientKeys)
|
||||
m.Set("gauge_current_dup_client_conns", &s.dupClientConns)
|
||||
m.Set("counter_total_dup_client_conns", &s.dupClientConnTotal)
|
||||
m.Set("accepts", &s.accepts)
|
||||
m.Set("clients_replaced", &s.clientsReplaced)
|
||||
m.Set("clients_replace_limited", &s.clientsReplaceLimited)
|
||||
m.Set("gauge_clients_replace_sleeping", &s.clientsReplaceSleeping)
|
||||
m.Set("bytes_received", &s.bytesRecv)
|
||||
m.Set("bytes_sent", &s.bytesSent)
|
||||
m.Set("packets_dropped", &s.packetsDropped)
|
||||
|
@ -662,7 +662,7 @@ func pubAll(b byte) (ret key.Public) {
|
||||
|
||||
func TestForwarderRegistration(t *testing.T) {
|
||||
s := &Server{
|
||||
clients: make(map[key.Public]*sclient),
|
||||
clients: make(map[key.Public]clientSet),
|
||||
clientsMesh: map[key.Public]PacketForwarder{},
|
||||
}
|
||||
want := func(want map[key.Public]PacketForwarder) {
|
||||
@ -745,7 +745,7 @@ func TestForwarderRegistration(t *testing.T) {
|
||||
key: u1,
|
||||
logf: logger.Discard,
|
||||
}
|
||||
s.clients[u1] = u1c
|
||||
s.clients[u1] = singleClient{u1c}
|
||||
s.RemovePacketForwarder(u1, testFwd(100))
|
||||
want(map[key.Public]PacketForwarder{
|
||||
u1: nil,
|
||||
@ -765,7 +765,7 @@ func TestForwarderRegistration(t *testing.T) {
|
||||
// Now pretend u1 was already connected locally (so clientsMesh[u1] is nil), and then we heard
|
||||
// that they're also connected to a peer of ours. That sholdn't transition the forwarder
|
||||
// from nil to the new one, not a multiForwarder.
|
||||
s.clients[u1] = u1c
|
||||
s.clients[u1] = singleClient{u1c}
|
||||
s.clientsMesh[u1] = nil
|
||||
want(map[key.Public]PacketForwarder{
|
||||
u1: nil,
|
||||
@ -851,125 +851,248 @@ func TestClientSendPong(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestServerReplaceClients(t *testing.T) {
|
||||
defer func() {
|
||||
timeSleep = time.Sleep
|
||||
timeNow = time.Now
|
||||
}()
|
||||
func TestServerDupClients(t *testing.T) {
|
||||
serverPriv := newPrivateKey(t)
|
||||
var s *Server
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
now = time.Unix(123, 0)
|
||||
sleeps int
|
||||
slept time.Duration
|
||||
)
|
||||
timeSleep = func(d time.Duration) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
sleeps++
|
||||
slept += d
|
||||
now = now.Add(d)
|
||||
}
|
||||
timeNow = func() time.Time {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return now
|
||||
}
|
||||
clientPriv := newPrivateKey(t)
|
||||
clientPub := clientPriv.Public()
|
||||
|
||||
serverPrivateKey := newPrivateKey(t)
|
||||
var logger logger.Logf = logger.Discard
|
||||
const debug = false
|
||||
if debug {
|
||||
logger = t.Logf
|
||||
}
|
||||
var c1, c2, c3 *sclient
|
||||
var clientName map[*sclient]string
|
||||
|
||||
s := NewServer(serverPrivateKey, logger)
|
||||
defer s.Close()
|
||||
|
||||
priv := newPrivateKey(t)
|
||||
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ln.Close()
|
||||
|
||||
connNum := 0
|
||||
connect := func() *Client {
|
||||
connNum++
|
||||
cout, err := net.Dial("tcp", ln.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
// run starts a new test case and resets clients back to their zero values.
|
||||
run := func(name string, dupPolicy dupPolicy, f func(t *testing.T)) {
|
||||
s = NewServer(serverPriv, t.Logf)
|
||||
s.dupPolicy = dupPolicy
|
||||
c1 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c1: ")}
|
||||
c2 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c2: ")}
|
||||
c3 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c3: ")}
|
||||
clientName = map[*sclient]string{
|
||||
c1: "c1",
|
||||
c2: "c2",
|
||||
c3: "c3",
|
||||
}
|
||||
|
||||
cin, err := ln.Accept()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin))
|
||||
go s.Accept(cin, brwServer, fmt.Sprintf("test-client-%d", connNum))
|
||||
|
||||
brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout))
|
||||
c, err := NewClient(priv, cout, brw, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("client %d: %v", connNum, err)
|
||||
}
|
||||
return c
|
||||
t.Run(name, f)
|
||||
}
|
||||
|
||||
wantVar := func(v *expvar.Int, want int64) {
|
||||
runBothWays := func(name string, f func(t *testing.T)) {
|
||||
run(name+"_disablefighters", disableFighters, f)
|
||||
run(name+"_lastwriteractive", lastWriterIsActive, f)
|
||||
}
|
||||
wantSingleClient := func(t *testing.T, want *sclient) {
|
||||
t.Helper()
|
||||
if got := v.Value(); got != want {
|
||||
t.Errorf("got %d; want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
wantClosed := func(c *Client) {
|
||||
t.Helper()
|
||||
for {
|
||||
m, err := c.Recv()
|
||||
if err != nil {
|
||||
t.Logf("got expected error: %v", err)
|
||||
switch s := s.clients[want.key].(type) {
|
||||
case singleClient:
|
||||
if s.c != want {
|
||||
t.Error("wrong single client")
|
||||
return
|
||||
}
|
||||
switch m.(type) {
|
||||
case ServerInfoMessage:
|
||||
continue
|
||||
default:
|
||||
t.Fatalf("client got %T; wanted an error", m)
|
||||
if want.isDup.Get() {
|
||||
t.Errorf("unexpected isDup on singleClient")
|
||||
}
|
||||
if want.isDisabled.Get() {
|
||||
t.Errorf("unexpected isDisabled on singleClient")
|
||||
}
|
||||
case nil:
|
||||
t.Error("no clients for key")
|
||||
case *dupClientSet:
|
||||
t.Error("unexpected multiple clients for key")
|
||||
}
|
||||
}
|
||||
wantNoClient := func(t *testing.T) {
|
||||
t.Helper()
|
||||
switch s := s.clients[clientPub].(type) {
|
||||
case nil:
|
||||
// Good.
|
||||
return
|
||||
default:
|
||||
t.Errorf("got %T; want empty", s)
|
||||
}
|
||||
}
|
||||
wantDupSet := func(t *testing.T) *dupClientSet {
|
||||
t.Helper()
|
||||
switch s := s.clients[clientPub].(type) {
|
||||
case *dupClientSet:
|
||||
return s
|
||||
default:
|
||||
t.Fatalf("wanted dup set; got %T", s)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
wantActive := func(t *testing.T, want *sclient) {
|
||||
t.Helper()
|
||||
set, ok := s.clients[clientPub]
|
||||
if !ok {
|
||||
t.Error("no set for key")
|
||||
return
|
||||
}
|
||||
got := set.ActiveClient()
|
||||
if got != want {
|
||||
t.Errorf("active client = %q; want %q", clientName[got], clientName[want])
|
||||
}
|
||||
}
|
||||
checkDup := func(t *testing.T, c *sclient, want bool) {
|
||||
t.Helper()
|
||||
if got := c.isDup.Get(); got != want {
|
||||
t.Errorf("client %q isDup = %v; want %v", clientName[c], got, want)
|
||||
}
|
||||
}
|
||||
checkDisabled := func(t *testing.T, c *sclient, want bool) {
|
||||
t.Helper()
|
||||
if got := c.isDisabled.Get(); got != want {
|
||||
t.Errorf("client %q isDisabled = %v; want %v", clientName[c], got, want)
|
||||
}
|
||||
}
|
||||
wantDupConns := func(t *testing.T, want int) {
|
||||
t.Helper()
|
||||
if got := s.dupClientConns.Value(); got != int64(want) {
|
||||
t.Errorf("dupClientConns = %v; want %v", got, want)
|
||||
}
|
||||
}
|
||||
wantDupKeys := func(t *testing.T, want int) {
|
||||
t.Helper()
|
||||
if got := s.dupClientKeys.Value(); got != int64(want) {
|
||||
t.Errorf("dupClientKeys = %v; want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
c1 := connect()
|
||||
waitConnect(t, c1)
|
||||
c2 := connect()
|
||||
waitConnect(t, c2)
|
||||
wantVar(&s.clientsReplaced, 1)
|
||||
wantClosed(c1)
|
||||
// Common case: a single client comes and goes, with no dups.
|
||||
runBothWays("one_comes_and_goes", func(t *testing.T) {
|
||||
wantNoClient(t)
|
||||
s.registerClient(c1)
|
||||
wantSingleClient(t, c1)
|
||||
s.unregisterClient(c1)
|
||||
wantNoClient(t)
|
||||
})
|
||||
|
||||
for i := 0; i < 100+5; i++ {
|
||||
c := connect()
|
||||
defer c.nc.Close()
|
||||
if s.clientsReplaceLimited.Value() == 0 && i < 90 {
|
||||
continue
|
||||
}
|
||||
t.Logf("for %d: replaced=%d, limited=%d, sleeping=%d", i,
|
||||
s.clientsReplaced.Value(),
|
||||
s.clientsReplaceLimited.Value(),
|
||||
s.clientsReplaceSleeping.Value(),
|
||||
)
|
||||
}
|
||||
// A still somewhat common case: a single client was
|
||||
// connected and then their wifi dies or laptop closes
|
||||
// or they switch networks and connect from a
|
||||
// different network. They have two connections but
|
||||
// it's not very bad. Only their new one is
|
||||
// active. The last one, being dead, doesn't send and
|
||||
// thus the new one doesn't get disabled.
|
||||
runBothWays("small_overlap_replacement", func(t *testing.T) {
|
||||
wantNoClient(t)
|
||||
s.registerClient(c1)
|
||||
wantSingleClient(t, c1)
|
||||
wantActive(t, c1)
|
||||
wantDupKeys(t, 0)
|
||||
wantDupKeys(t, 0)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if sleeps == 0 {
|
||||
t.Errorf("no sleeps")
|
||||
}
|
||||
if slept == 0 {
|
||||
t.Errorf("total sleep duration was 0")
|
||||
}
|
||||
s.registerClient(c2) // wifi dies; c2 replacement connects
|
||||
wantDupSet(t)
|
||||
wantDupConns(t, 2)
|
||||
wantDupKeys(t, 1)
|
||||
checkDup(t, c1, true)
|
||||
checkDup(t, c2, true)
|
||||
checkDisabled(t, c1, false)
|
||||
checkDisabled(t, c2, false)
|
||||
wantActive(t, c2) // sends go to the replacement
|
||||
|
||||
s.unregisterClient(c1) // c1 finally times out
|
||||
wantSingleClient(t, c2)
|
||||
checkDup(t, c2, false) // c2 is longer a dup
|
||||
wantActive(t, c2)
|
||||
wantDupConns(t, 0)
|
||||
wantDupKeys(t, 0)
|
||||
})
|
||||
|
||||
// Key cloning situation with concurrent clients, both trying
|
||||
// to write.
|
||||
run("concurrent_dups_get_disabled", disableFighters, func(t *testing.T) {
|
||||
wantNoClient(t)
|
||||
s.registerClient(c1)
|
||||
wantSingleClient(t, c1)
|
||||
wantActive(t, c1)
|
||||
s.registerClient(c2)
|
||||
wantDupSet(t)
|
||||
wantDupKeys(t, 1)
|
||||
wantDupConns(t, 2)
|
||||
wantActive(t, c2)
|
||||
checkDup(t, c1, true)
|
||||
checkDup(t, c2, true)
|
||||
checkDisabled(t, c1, false)
|
||||
checkDisabled(t, c2, false)
|
||||
|
||||
s.noteClientActivity(c2)
|
||||
checkDisabled(t, c1, false)
|
||||
checkDisabled(t, c2, false)
|
||||
s.noteClientActivity(c1)
|
||||
checkDisabled(t, c1, true)
|
||||
checkDisabled(t, c2, true)
|
||||
wantActive(t, nil)
|
||||
|
||||
s.registerClient(c3)
|
||||
wantActive(t, c3)
|
||||
checkDisabled(t, c3, false)
|
||||
wantDupKeys(t, 1)
|
||||
wantDupConns(t, 3)
|
||||
|
||||
s.unregisterClient(c3)
|
||||
wantActive(t, nil)
|
||||
wantDupKeys(t, 1)
|
||||
wantDupConns(t, 2)
|
||||
|
||||
s.unregisterClient(c2)
|
||||
wantSingleClient(t, c1)
|
||||
wantDupKeys(t, 0)
|
||||
wantDupConns(t, 0)
|
||||
})
|
||||
|
||||
// Key cloning with an A->B->C->A series instead.
|
||||
run("concurrent_dups_three_parties", disableFighters, func(t *testing.T) {
|
||||
wantNoClient(t)
|
||||
s.registerClient(c1)
|
||||
s.registerClient(c2)
|
||||
s.registerClient(c3)
|
||||
s.noteClientActivity(c1)
|
||||
checkDisabled(t, c1, true)
|
||||
checkDisabled(t, c2, true)
|
||||
checkDisabled(t, c3, true)
|
||||
wantActive(t, nil)
|
||||
})
|
||||
|
||||
run("activity_promotes_primary_when_nil", disableFighters, func(t *testing.T) {
|
||||
wantNoClient(t)
|
||||
|
||||
// Last registered client is the active one...
|
||||
s.registerClient(c1)
|
||||
wantActive(t, c1)
|
||||
s.registerClient(c2)
|
||||
wantActive(t, c2)
|
||||
s.registerClient(c3)
|
||||
s.noteClientActivity(c2)
|
||||
wantActive(t, c3)
|
||||
|
||||
// But if the last one goes away, the one with the
|
||||
// most recent activity wins.
|
||||
s.unregisterClient(c3)
|
||||
wantActive(t, c2)
|
||||
})
|
||||
|
||||
run("concurrent_dups_three_parties_last_writer", lastWriterIsActive, func(t *testing.T) {
|
||||
wantNoClient(t)
|
||||
|
||||
s.registerClient(c1)
|
||||
wantActive(t, c1)
|
||||
s.registerClient(c2)
|
||||
wantActive(t, c2)
|
||||
|
||||
s.noteClientActivity(c1)
|
||||
checkDisabled(t, c1, false)
|
||||
checkDisabled(t, c2, false)
|
||||
wantActive(t, c1)
|
||||
|
||||
s.noteClientActivity(c2)
|
||||
checkDisabled(t, c1, false)
|
||||
checkDisabled(t, c2, false)
|
||||
wantActive(t, c2)
|
||||
|
||||
s.unregisterClient(c2)
|
||||
checkDisabled(t, c1, false)
|
||||
wantActive(t, c1)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLimiter(t *testing.T) {
|
||||
|
@ -18,11 +18,12 @@ func _() {
|
||||
_ = x[dropReasonQueueHead-3]
|
||||
_ = x[dropReasonQueueTail-4]
|
||||
_ = x[dropReasonWriteError-5]
|
||||
_ = x[dropReasonDupClient-6]
|
||||
}
|
||||
|
||||
const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteError"
|
||||
const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClient"
|
||||
|
||||
var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59}
|
||||
var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68}
|
||||
|
||||
func (i dropReason) String() string {
|
||||
if i < 0 || i >= dropReason(len(_dropReason_index)-1) {
|
||||
|
Loading…
Reference in New Issue
Block a user