mirror of
https://github.com/tailscale/tailscale.git
synced 2024-11-25 19:15:34 +00:00
derp: accept dup clients without closing prior's connection
A public key should only have max one connection to a given
DERP node (or really: one connection to a node in a region).
But if people clone their machine keys (e.g. clone their VM, Raspbery
Pi SD card, etc), then we can get into a situation where a public key
is connected multiple times.
Originally, the DERP server handled this by just kicking out a prior
connections whenever a new one came. But this led to reconnect fights
where 2+ nodes were in hard loops trying to reconnect and kicking out
their peer.
Then a909d37a59
tried to add rate
limiting to how often that dup-kicking can happen, but empirically it
just doesn't work and ~leaks a bunch of goroutines and TCP
connections, tying them up for hour+ while more and more accumulate
and waste memory. Mostly because we were doing a time.Sleep forever
while not reading from their TCP connections.
Instead, just accept multiple connections per public key but track
which is the most recent. And if two both are writing back & forth,
then optionally disable them both. That last part is only enabled in
tests for now. The current default policy is just last-sender-wins
while we gather the next round of stats.
Updates #2751
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
parent
debaaebf3b
commit
73280595a8
@ -41,6 +41,7 @@
|
|||||||
"tailscale.com/client/tailscale"
|
"tailscale.com/client/tailscale"
|
||||||
"tailscale.com/disco"
|
"tailscale.com/disco"
|
||||||
"tailscale.com/metrics"
|
"tailscale.com/metrics"
|
||||||
|
"tailscale.com/syncs"
|
||||||
"tailscale.com/types/key"
|
"tailscale.com/types/key"
|
||||||
"tailscale.com/types/logger"
|
"tailscale.com/types/logger"
|
||||||
"tailscale.com/types/pad32"
|
"tailscale.com/types/pad32"
|
||||||
@ -77,6 +78,21 @@ func init() {
|
|||||||
writeTimeout = 2 * time.Second
|
writeTimeout = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// dupPolicy is a temporary (2021-08-30) mechanism to change the policy
|
||||||
|
// of how duplicate connection for the same key are handled.
|
||||||
|
type dupPolicy int8
|
||||||
|
|
||||||
|
const (
|
||||||
|
// lastWriterIsActive is a dupPolicy where the connection
|
||||||
|
// to send traffic for a peer is the active one.
|
||||||
|
lastWriterIsActive dupPolicy = iota
|
||||||
|
|
||||||
|
// disableFighters is a dupPolicy that detects if peers
|
||||||
|
// are trying to send interleaved with each other and
|
||||||
|
// then disables all of them.
|
||||||
|
disableFighters
|
||||||
|
)
|
||||||
|
|
||||||
// Server is a DERP server.
|
// Server is a DERP server.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
// WriteTimeout, if non-zero, specifies how long to wait
|
// WriteTimeout, if non-zero, specifies how long to wait
|
||||||
@ -90,9 +106,9 @@ type Server struct {
|
|||||||
meshKey string
|
meshKey string
|
||||||
limitedLogf logger.Logf
|
limitedLogf logger.Logf
|
||||||
metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
|
metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
|
||||||
|
dupPolicy dupPolicy
|
||||||
|
|
||||||
// Counters:
|
// Counters:
|
||||||
_ pad32.Four
|
|
||||||
packetsSent, bytesSent expvar.Int
|
packetsSent, bytesSent expvar.Int
|
||||||
packetsRecv, bytesRecv expvar.Int
|
packetsRecv, bytesRecv expvar.Int
|
||||||
packetsRecvByKind metrics.LabelMap
|
packetsRecvByKind metrics.LabelMap
|
||||||
@ -112,9 +128,9 @@ type Server struct {
|
|||||||
accepts expvar.Int
|
accepts expvar.Int
|
||||||
curClients expvar.Int
|
curClients expvar.Int
|
||||||
curHomeClients expvar.Int // ones with preferred
|
curHomeClients expvar.Int // ones with preferred
|
||||||
clientsReplaced expvar.Int
|
dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
|
||||||
clientsReplaceLimited expvar.Int
|
dupClientConns expvar.Int // current number of connections sharing a public key
|
||||||
clientsReplaceSleeping expvar.Int
|
dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
|
||||||
unknownFrames expvar.Int
|
unknownFrames expvar.Int
|
||||||
homeMovesIn expvar.Int // established clients announce home server moves in
|
homeMovesIn expvar.Int // established clients announce home server moves in
|
||||||
homeMovesOut expvar.Int // established clients announce home server moves out
|
homeMovesOut expvar.Int // established clients announce home server moves out
|
||||||
@ -130,7 +146,7 @@ type Server struct {
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
closed bool
|
closed bool
|
||||||
netConns map[Conn]chan struct{} // chan is closed when conn closes
|
netConns map[Conn]chan struct{} // chan is closed when conn closes
|
||||||
clients map[key.Public]*sclient
|
clients map[key.Public]clientSet
|
||||||
watchers map[*sclient]bool // mesh peer -> true
|
watchers map[*sclient]bool // mesh peer -> true
|
||||||
// clientsMesh tracks all clients in the cluster, both locally
|
// clientsMesh tracks all clients in the cluster, both locally
|
||||||
// and to mesh peers. If the value is nil, that means the
|
// and to mesh peers. If the value is nil, that means the
|
||||||
@ -148,6 +164,112 @@ type Server struct {
|
|||||||
keyOfAddr map[netaddr.IPPort]key.Public
|
keyOfAddr map[netaddr.IPPort]key.Public
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clientSet represents 1 or more *sclients.
|
||||||
|
//
|
||||||
|
// The two implementations are singleClient and *dupClientSet.
|
||||||
|
//
|
||||||
|
// In the common case, client should only have one connection to the
|
||||||
|
// DERP server for a given key. When they're connected multiple times,
|
||||||
|
// we record their set of connections in dupClientSet and keep their
|
||||||
|
// connections open to make them happy (to keep them from spinning,
|
||||||
|
// etc) and keep track of which is the latest connection. If only the last
|
||||||
|
// is sending traffic, that last one is the active connection and it
|
||||||
|
// gets traffic. Otherwise, in the case of a cloned node key, the
|
||||||
|
// whole set of dups doesn't receive data frames.
|
||||||
|
//
|
||||||
|
// All methods should only be called while holding Server.mu.
|
||||||
|
//
|
||||||
|
// TODO(bradfitz): Issue 2746: in the future we'll send some sort of
|
||||||
|
// "health_error" frame to them that'll communicate to the end users
|
||||||
|
// that they cloned a device key, and we'll also surface it in the
|
||||||
|
// admin panel, etc.
|
||||||
|
type clientSet interface {
|
||||||
|
// ActiveClient returns the most recently added client to
|
||||||
|
// the set, as long as it hasn't been disabled, in which
|
||||||
|
// case it returns nil.
|
||||||
|
ActiveClient() *sclient
|
||||||
|
|
||||||
|
// Len returns the number of clients in the set.
|
||||||
|
Len() int
|
||||||
|
|
||||||
|
// ForeachClient calls f for each client in the set.
|
||||||
|
ForeachClient(f func(*sclient))
|
||||||
|
}
|
||||||
|
|
||||||
|
// singleClient is a clientSet of a single connection.
|
||||||
|
// This is the common case.
|
||||||
|
type singleClient struct{ c *sclient }
|
||||||
|
|
||||||
|
func (s singleClient) ActiveClient() *sclient { return s.c }
|
||||||
|
func (s singleClient) Len() int { return 1 }
|
||||||
|
func (s singleClient) ForeachClient(f func(*sclient)) { f(s.c) }
|
||||||
|
|
||||||
|
// A dupClientSet is a clientSet of more than 1 connection.
|
||||||
|
//
|
||||||
|
// This can occur in some reasonable cases (temporarily while users
|
||||||
|
// are changing networks) or in the case of a cloned key. In the
|
||||||
|
// cloned key case, both peers are speaking and the clients get
|
||||||
|
// disabled.
|
||||||
|
//
|
||||||
|
// All fields are guarded by Server.mu.
|
||||||
|
type dupClientSet struct {
|
||||||
|
// set is the set of connected clients for sclient.key.
|
||||||
|
// The values are all true.
|
||||||
|
set map[*sclient]bool
|
||||||
|
|
||||||
|
// last is the most recent addition to set, or nil if the most
|
||||||
|
// recent one has since disconnected and nobody else has send
|
||||||
|
// data since.
|
||||||
|
last *sclient
|
||||||
|
|
||||||
|
// sendHistory is a log of which members of set have sent
|
||||||
|
// frames to the derp server, with adjacent duplicates
|
||||||
|
// removed. When a member of set is removed, the same
|
||||||
|
// element(s) are removed from sendHistory.
|
||||||
|
sendHistory []*sclient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *dupClientSet) ActiveClient() *sclient {
|
||||||
|
if s.last != nil && !s.last.isDisabled.Get() {
|
||||||
|
return s.last
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (s *dupClientSet) Len() int { return len(s.set) }
|
||||||
|
func (s *dupClientSet) ForeachClient(f func(*sclient)) {
|
||||||
|
for c := range s.set {
|
||||||
|
f(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeClient removes c from s and reports whether it was in s
|
||||||
|
// to begin with.
|
||||||
|
func (s *dupClientSet) removeClient(c *sclient) bool {
|
||||||
|
n := len(s.set)
|
||||||
|
delete(s.set, c)
|
||||||
|
if s.last == c {
|
||||||
|
s.last = nil
|
||||||
|
}
|
||||||
|
if len(s.set) == n {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
trim := s.sendHistory[:0]
|
||||||
|
for _, v := range s.sendHistory {
|
||||||
|
if s.set[v] && (len(trim) == 0 || trim[len(trim)-1] != v) {
|
||||||
|
trim = append(trim, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := len(trim); i < len(s.sendHistory); i++ {
|
||||||
|
s.sendHistory[i] = nil
|
||||||
|
}
|
||||||
|
s.sendHistory = trim
|
||||||
|
if s.last == nil && len(s.sendHistory) > 0 {
|
||||||
|
s.last = s.sendHistory[len(s.sendHistory)-1]
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// PacketForwarder is something that can forward packets.
|
// PacketForwarder is something that can forward packets.
|
||||||
//
|
//
|
||||||
// It's mostly an interface for circular dependency reasons; the
|
// It's mostly an interface for circular dependency reasons; the
|
||||||
@ -184,7 +306,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
|
|||||||
packetsRecvByKind: metrics.LabelMap{Label: "kind"},
|
packetsRecvByKind: metrics.LabelMap{Label: "kind"},
|
||||||
packetsDroppedReason: metrics.LabelMap{Label: "reason"},
|
packetsDroppedReason: metrics.LabelMap{Label: "reason"},
|
||||||
packetsDroppedType: metrics.LabelMap{Label: "type"},
|
packetsDroppedType: metrics.LabelMap{Label: "type"},
|
||||||
clients: map[key.Public]*sclient{},
|
clients: map[key.Public]clientSet{},
|
||||||
clientsMesh: map[key.Public]PacketForwarder{},
|
clientsMesh: map[key.Public]PacketForwarder{},
|
||||||
netConns: map[Conn]chan struct{}{},
|
netConns: map[Conn]chan struct{}{},
|
||||||
memSys0: ms.Sys,
|
memSys0: ms.Sys,
|
||||||
@ -344,39 +466,48 @@ func (s *Server) MetaCert() []byte { return s.metaCert }
|
|||||||
|
|
||||||
// registerClient notes that client c is now authenticated and ready for packets.
|
// registerClient notes that client c is now authenticated and ready for packets.
|
||||||
//
|
//
|
||||||
// If c's public key was already connected with a different
|
// If c.key is connected more than once, the earlier connection(s) are
|
||||||
// connection, the prior one is closed, unless it's fighting rapidly
|
// placed in a non-active state where we read from them (primarily to
|
||||||
// with another client with the same key, in which case the returned
|
// observe EOFs/timeouts) but won't send them frames on the assumption
|
||||||
// ok is false, and the caller should wait the provided duration
|
// that they're dead.
|
||||||
// before trying again.
|
func (s *Server) registerClient(c *sclient) {
|
||||||
func (s *Server) registerClient(c *sclient) (ok bool, d time.Duration) {
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
old := s.clients[c.key]
|
|
||||||
if old == nil {
|
set := s.clients[c.key]
|
||||||
c.logf("adding connection")
|
switch set := set.(type) {
|
||||||
} else {
|
case nil:
|
||||||
// Take over the old rate limiter, discarding the one
|
s.clients[c.key] = singleClient{c}
|
||||||
// our caller just made.
|
case singleClient:
|
||||||
c.replaceLimiter = old.replaceLimiter
|
s.dupClientKeys.Add(1)
|
||||||
if rr := c.replaceLimiter.ReserveN(timeNow(), 1); rr.OK() {
|
s.dupClientConns.Add(2) // both old and new count
|
||||||
if d := rr.DelayFrom(timeNow()); d > 0 {
|
s.dupClientConnTotal.Add(1)
|
||||||
s.clientsReplaceLimited.Add(1)
|
old := set.ActiveClient()
|
||||||
return false, d
|
old.isDup.Set(true)
|
||||||
|
c.isDup.Set(true)
|
||||||
|
s.clients[c.key] = &dupClientSet{
|
||||||
|
last: c,
|
||||||
|
set: map[*sclient]bool{
|
||||||
|
old: true,
|
||||||
|
c: true,
|
||||||
|
},
|
||||||
|
sendHistory: []*sclient{old},
|
||||||
}
|
}
|
||||||
|
case *dupClientSet:
|
||||||
|
s.dupClientConns.Add(1) // the gauge
|
||||||
|
s.dupClientConnTotal.Add(1) // the counter
|
||||||
|
c.isDup.Set(true)
|
||||||
|
set.set[c] = true
|
||||||
|
set.last = c
|
||||||
|
set.sendHistory = append(set.sendHistory, c)
|
||||||
}
|
}
|
||||||
s.clientsReplaced.Add(1)
|
|
||||||
c.logf("adding connection, replacing %s", old.remoteAddr)
|
|
||||||
go old.nc.Close()
|
|
||||||
}
|
|
||||||
s.clients[c.key] = c
|
|
||||||
if _, ok := s.clientsMesh[c.key]; !ok {
|
if _, ok := s.clientsMesh[c.key]; !ok {
|
||||||
s.clientsMesh[c.key] = nil // just for varz of total users in cluster
|
s.clientsMesh[c.key] = nil // just for varz of total users in cluster
|
||||||
}
|
}
|
||||||
s.keyOfAddr[c.remoteIPPort] = c.key
|
s.keyOfAddr[c.remoteIPPort] = c.key
|
||||||
s.curClients.Add(1)
|
s.curClients.Add(1)
|
||||||
s.broadcastPeerStateChangeLocked(c.key, true)
|
s.broadcastPeerStateChangeLocked(c.key, true)
|
||||||
return true, 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// broadcastPeerStateChangeLocked enqueues a message to all watchers
|
// broadcastPeerStateChangeLocked enqueues a message to all watchers
|
||||||
@ -395,8 +526,12 @@ func (s *Server) broadcastPeerStateChangeLocked(peer key.Public, present bool) {
|
|||||||
func (s *Server) unregisterClient(c *sclient) {
|
func (s *Server) unregisterClient(c *sclient) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
cur := s.clients[c.key]
|
|
||||||
if cur == c {
|
set := s.clients[c.key]
|
||||||
|
switch set := set.(type) {
|
||||||
|
case nil:
|
||||||
|
c.logf("[unexpected]; clients map is empty")
|
||||||
|
case singleClient:
|
||||||
c.logf("removing connection")
|
c.logf("removing connection")
|
||||||
delete(s.clients, c.key)
|
delete(s.clients, c.key)
|
||||||
if v, ok := s.clientsMesh[c.key]; ok && v == nil {
|
if v, ok := s.clientsMesh[c.key]; ok && v == nil {
|
||||||
@ -404,7 +539,28 @@ func (s *Server) unregisterClient(c *sclient) {
|
|||||||
s.notePeerGoneFromRegionLocked(c.key)
|
s.notePeerGoneFromRegionLocked(c.key)
|
||||||
}
|
}
|
||||||
s.broadcastPeerStateChangeLocked(c.key, false)
|
s.broadcastPeerStateChangeLocked(c.key, false)
|
||||||
|
case *dupClientSet:
|
||||||
|
if set.removeClient(c) {
|
||||||
|
s.dupClientConns.Add(-1)
|
||||||
|
} else {
|
||||||
|
c.logf("[unexpected]; dup client set didn't shrink")
|
||||||
}
|
}
|
||||||
|
if set.Len() == 1 {
|
||||||
|
s.dupClientConns.Add(-1) // again; for the original one's
|
||||||
|
s.dupClientKeys.Add(-1)
|
||||||
|
var remain *sclient
|
||||||
|
for remain = range set.set {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if remain == nil {
|
||||||
|
panic("unexpected nil remain from single element dup set")
|
||||||
|
}
|
||||||
|
remain.isDisabled.Set(false)
|
||||||
|
remain.isDup.Set(false)
|
||||||
|
s.clients[c.key] = singleClient{remain}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if c.canMesh {
|
if c.canMesh {
|
||||||
delete(s.watchers, c)
|
delete(s.watchers, c)
|
||||||
}
|
}
|
||||||
@ -431,9 +587,15 @@ func (s *Server) notePeerGoneFromRegionLocked(key key.Public) {
|
|||||||
// or move them over to the active client (in case a replaced client
|
// or move them over to the active client (in case a replaced client
|
||||||
// connection is being unregistered).
|
// connection is being unregistered).
|
||||||
for pubKey, connNum := range s.sentTo[key] {
|
for pubKey, connNum := range s.sentTo[key] {
|
||||||
if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum {
|
set, ok := s.clients[pubKey]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
set.ForeachClient(func(peer *sclient) {
|
||||||
|
if peer.connNum == connNum {
|
||||||
go peer.requestPeerGoneWrite(key)
|
go peer.requestPeerGoneWrite(key)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
delete(s.sentTo, key)
|
delete(s.sentTo, key)
|
||||||
}
|
}
|
||||||
@ -503,12 +665,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
|
|||||||
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
|
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
|
||||||
peerGone: make(chan key.Public),
|
peerGone: make(chan key.Public),
|
||||||
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
|
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
|
||||||
|
|
||||||
// Allow kicking out previous connections once a
|
|
||||||
// minute, with a very high burst of 100. Once a
|
|
||||||
// minute is less than the client's 2 minute
|
|
||||||
// inactivity timeout.
|
|
||||||
replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.canMesh {
|
if c.canMesh {
|
||||||
@ -518,15 +674,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
|
|||||||
c.info = *clientInfo
|
c.info = *clientInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
s.registerClient(c)
|
||||||
ok, d := s.registerClient(c)
|
|
||||||
if ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
s.clientsReplaceSleeping.Add(1)
|
|
||||||
timeSleep(d)
|
|
||||||
s.clientsReplaceSleeping.Add(-1)
|
|
||||||
}
|
|
||||||
defer s.unregisterClient(c)
|
defer s.unregisterClient(c)
|
||||||
|
|
||||||
err = s.sendServerInfo(c.bw, clientKey)
|
err = s.sendServerInfo(c.bw, clientKey)
|
||||||
@ -570,6 +718,7 @@ func (c *sclient) run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err)
|
return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err)
|
||||||
}
|
}
|
||||||
|
c.s.noteClientActivity(c)
|
||||||
switch ft {
|
switch ft {
|
||||||
case frameNotePreferred:
|
case frameNotePreferred:
|
||||||
err = c.handleFrameNotePreferred(ft, fl)
|
err = c.handleFrameNotePreferred(ft, fl)
|
||||||
@ -634,9 +783,15 @@ func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error {
|
|||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
if target, ok := s.clients[targetKey]; ok {
|
if set, ok := s.clients[targetKey]; ok {
|
||||||
|
if set.Len() == 1 {
|
||||||
c.logf("frameClosePeer closing peer %x", targetKey)
|
c.logf("frameClosePeer closing peer %x", targetKey)
|
||||||
|
} else {
|
||||||
|
c.logf("frameClosePeer closing peer %x (%d connections)", targetKey, set.Len())
|
||||||
|
}
|
||||||
|
set.ForeachClient(func(target *sclient) {
|
||||||
go target.nc.Close()
|
go target.nc.Close()
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
c.logf("frameClosePeer failed to find peer %x", targetKey)
|
c.logf("frameClosePeer failed to find peer %x", targetKey)
|
||||||
}
|
}
|
||||||
@ -658,15 +813,25 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
|||||||
}
|
}
|
||||||
s.packetsForwardedIn.Add(1)
|
s.packetsForwardedIn.Add(1)
|
||||||
|
|
||||||
|
var dstLen int
|
||||||
|
var dst *sclient
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
dst := s.clients[dstKey]
|
if set, ok := s.clients[dstKey]; ok {
|
||||||
|
dstLen = set.Len()
|
||||||
|
dst = set.ActiveClient()
|
||||||
|
}
|
||||||
if dst != nil {
|
if dst != nil {
|
||||||
s.notePeerSendLocked(srcKey, dst)
|
s.notePeerSendLocked(srcKey, dst)
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
if dst == nil {
|
if dst == nil {
|
||||||
s.recordDrop(contents, srcKey, dstKey, dropReasonUnknownDestOnFwd)
|
reason := dropReasonUnknownDestOnFwd
|
||||||
|
if dstLen > 1 {
|
||||||
|
reason = dropReasonDupClient
|
||||||
|
}
|
||||||
|
s.recordDrop(contents, srcKey, dstKey, reason)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -699,12 +864,18 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var fwd PacketForwarder
|
var fwd PacketForwarder
|
||||||
|
var dstLen int
|
||||||
|
var dst *sclient
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
dst := s.clients[dstKey]
|
if set, ok := s.clients[dstKey]; ok {
|
||||||
if dst == nil {
|
dstLen = set.Len()
|
||||||
fwd = s.clientsMesh[dstKey]
|
dst = set.ActiveClient()
|
||||||
} else {
|
}
|
||||||
|
if dst != nil {
|
||||||
s.notePeerSendLocked(c.key, dst)
|
s.notePeerSendLocked(c.key, dst)
|
||||||
|
} else if dstLen < 1 {
|
||||||
|
fwd = s.clientsMesh[dstKey]
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
@ -717,7 +888,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.recordDrop(contents, c.key, dstKey, dropReasonUnknownDest)
|
reason := dropReasonUnknownDest
|
||||||
|
if dstLen > 1 {
|
||||||
|
reason = dropReasonDupClient
|
||||||
|
}
|
||||||
|
s.recordDrop(contents, c.key, dstKey, reason)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -741,6 +916,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
|||||||
dropReasonQueueHead // destination queue is full, dropped packet at queue head
|
dropReasonQueueHead // destination queue is full, dropped packet at queue head
|
||||||
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
|
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
|
||||||
dropReasonWriteError // OS write() failed
|
dropReasonWriteError // OS write() failed
|
||||||
|
dropReasonDupClient // the public key is connected 2+ times (active/active, fighting)
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.Public, reason dropReason) {
|
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.Public, reason dropReason) {
|
||||||
@ -850,6 +1026,57 @@ func (s *Server) sendServerKey(lw *lazyBufioWriter) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) noteClientActivity(c *sclient) {
|
||||||
|
if !c.isDup.Get() {
|
||||||
|
// Fast path for clients that aren't in a dup set.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if c.isDisabled.Get() {
|
||||||
|
// If they're already disabled, no point checking more.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
ds, ok := s.clients[c.key].(*dupClientSet)
|
||||||
|
if !ok {
|
||||||
|
// It became unduped in between the isDup fast path check above
|
||||||
|
// and the mutex check. Nothing to do.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.dupPolicy == lastWriterIsActive {
|
||||||
|
ds.last = c
|
||||||
|
} else if ds.last == nil {
|
||||||
|
// If we didn't have a primary, let the current
|
||||||
|
// speaker be the primary.
|
||||||
|
ds.last = c
|
||||||
|
}
|
||||||
|
|
||||||
|
if sh := ds.sendHistory; len(sh) != 0 && sh[len(sh)-1] == c {
|
||||||
|
// The client c was the last client to make activity
|
||||||
|
// in this set and it was already recorded. Nothing to
|
||||||
|
// do.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we saw this connection send previously, then consider
|
||||||
|
// the group fighting and disable them all.
|
||||||
|
if s.dupPolicy == disableFighters {
|
||||||
|
for _, prior := range ds.sendHistory {
|
||||||
|
if prior == c {
|
||||||
|
ds.ForeachClient(func(c *sclient) {
|
||||||
|
c.isDisabled.Set(true)
|
||||||
|
})
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append this client to the list of clients who spoke last.
|
||||||
|
ds.sendHistory = append(ds.sendHistory, c)
|
||||||
|
}
|
||||||
|
|
||||||
type serverInfo struct {
|
type serverInfo struct {
|
||||||
Version int `json:"version,omitempty"`
|
Version int `json:"version,omitempty"`
|
||||||
}
|
}
|
||||||
@ -987,6 +1214,8 @@ type sclient struct {
|
|||||||
peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers)
|
peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers)
|
||||||
meshUpdate chan struct{} // write request to write peerStateChange
|
meshUpdate chan struct{} // write request to write peerStateChange
|
||||||
canMesh bool // clientInfo had correct mesh token for inter-region routing
|
canMesh bool // clientInfo had correct mesh token for inter-region routing
|
||||||
|
isDup syncs.AtomicBool // whether more than 1 sclient for key is connected
|
||||||
|
isDisabled syncs.AtomicBool // whether sends to this peer are disabled due to active/active dups
|
||||||
|
|
||||||
// replaceLimiter controls how quickly two connections with
|
// replaceLimiter controls how quickly two connections with
|
||||||
// the same client key can kick each other off the server by
|
// the same client key can kick each other off the server by
|
||||||
@ -1399,10 +1628,10 @@ func (s *Server) ExpVar() expvar.Var {
|
|||||||
m.Set("gauge_clients_total", expvar.Func(func() interface{} { return len(s.clientsMesh) }))
|
m.Set("gauge_clients_total", expvar.Func(func() interface{} { return len(s.clientsMesh) }))
|
||||||
m.Set("gauge_clients_local", expvar.Func(func() interface{} { return len(s.clients) }))
|
m.Set("gauge_clients_local", expvar.Func(func() interface{} { return len(s.clients) }))
|
||||||
m.Set("gauge_clients_remote", expvar.Func(func() interface{} { return len(s.clientsMesh) - len(s.clients) }))
|
m.Set("gauge_clients_remote", expvar.Func(func() interface{} { return len(s.clientsMesh) - len(s.clients) }))
|
||||||
|
m.Set("gauge_current_dup_client_keys", &s.dupClientKeys)
|
||||||
|
m.Set("gauge_current_dup_client_conns", &s.dupClientConns)
|
||||||
|
m.Set("counter_total_dup_client_conns", &s.dupClientConnTotal)
|
||||||
m.Set("accepts", &s.accepts)
|
m.Set("accepts", &s.accepts)
|
||||||
m.Set("clients_replaced", &s.clientsReplaced)
|
|
||||||
m.Set("clients_replace_limited", &s.clientsReplaceLimited)
|
|
||||||
m.Set("gauge_clients_replace_sleeping", &s.clientsReplaceSleeping)
|
|
||||||
m.Set("bytes_received", &s.bytesRecv)
|
m.Set("bytes_received", &s.bytesRecv)
|
||||||
m.Set("bytes_sent", &s.bytesSent)
|
m.Set("bytes_sent", &s.bytesSent)
|
||||||
m.Set("packets_dropped", &s.packetsDropped)
|
m.Set("packets_dropped", &s.packetsDropped)
|
||||||
|
@ -662,7 +662,7 @@ func pubAll(b byte) (ret key.Public) {
|
|||||||
|
|
||||||
func TestForwarderRegistration(t *testing.T) {
|
func TestForwarderRegistration(t *testing.T) {
|
||||||
s := &Server{
|
s := &Server{
|
||||||
clients: make(map[key.Public]*sclient),
|
clients: make(map[key.Public]clientSet),
|
||||||
clientsMesh: map[key.Public]PacketForwarder{},
|
clientsMesh: map[key.Public]PacketForwarder{},
|
||||||
}
|
}
|
||||||
want := func(want map[key.Public]PacketForwarder) {
|
want := func(want map[key.Public]PacketForwarder) {
|
||||||
@ -745,7 +745,7 @@ func TestForwarderRegistration(t *testing.T) {
|
|||||||
key: u1,
|
key: u1,
|
||||||
logf: logger.Discard,
|
logf: logger.Discard,
|
||||||
}
|
}
|
||||||
s.clients[u1] = u1c
|
s.clients[u1] = singleClient{u1c}
|
||||||
s.RemovePacketForwarder(u1, testFwd(100))
|
s.RemovePacketForwarder(u1, testFwd(100))
|
||||||
want(map[key.Public]PacketForwarder{
|
want(map[key.Public]PacketForwarder{
|
||||||
u1: nil,
|
u1: nil,
|
||||||
@ -765,7 +765,7 @@ func TestForwarderRegistration(t *testing.T) {
|
|||||||
// Now pretend u1 was already connected locally (so clientsMesh[u1] is nil), and then we heard
|
// Now pretend u1 was already connected locally (so clientsMesh[u1] is nil), and then we heard
|
||||||
// that they're also connected to a peer of ours. That sholdn't transition the forwarder
|
// that they're also connected to a peer of ours. That sholdn't transition the forwarder
|
||||||
// from nil to the new one, not a multiForwarder.
|
// from nil to the new one, not a multiForwarder.
|
||||||
s.clients[u1] = u1c
|
s.clients[u1] = singleClient{u1c}
|
||||||
s.clientsMesh[u1] = nil
|
s.clientsMesh[u1] = nil
|
||||||
want(map[key.Public]PacketForwarder{
|
want(map[key.Public]PacketForwarder{
|
||||||
u1: nil,
|
u1: nil,
|
||||||
@ -851,125 +851,248 @@ func TestClientSendPong(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestServerReplaceClients(t *testing.T) {
|
func TestServerDupClients(t *testing.T) {
|
||||||
defer func() {
|
serverPriv := newPrivateKey(t)
|
||||||
timeSleep = time.Sleep
|
var s *Server
|
||||||
timeNow = time.Now
|
|
||||||
}()
|
|
||||||
|
|
||||||
var (
|
clientPriv := newPrivateKey(t)
|
||||||
mu sync.Mutex
|
clientPub := clientPriv.Public()
|
||||||
now = time.Unix(123, 0)
|
|
||||||
sleeps int
|
var c1, c2, c3 *sclient
|
||||||
slept time.Duration
|
var clientName map[*sclient]string
|
||||||
)
|
|
||||||
timeSleep = func(d time.Duration) {
|
// run starts a new test case and resets clients back to their zero values.
|
||||||
mu.Lock()
|
run := func(name string, dupPolicy dupPolicy, f func(t *testing.T)) {
|
||||||
defer mu.Unlock()
|
s = NewServer(serverPriv, t.Logf)
|
||||||
sleeps++
|
s.dupPolicy = dupPolicy
|
||||||
slept += d
|
c1 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c1: ")}
|
||||||
now = now.Add(d)
|
c2 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c2: ")}
|
||||||
|
c3 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c3: ")}
|
||||||
|
clientName = map[*sclient]string{
|
||||||
|
c1: "c1",
|
||||||
|
c2: "c2",
|
||||||
|
c3: "c3",
|
||||||
}
|
}
|
||||||
timeNow = func() time.Time {
|
t.Run(name, f)
|
||||||
mu.Lock()
|
|
||||||
defer mu.Unlock()
|
|
||||||
return now
|
|
||||||
}
|
}
|
||||||
|
runBothWays := func(name string, f func(t *testing.T)) {
|
||||||
serverPrivateKey := newPrivateKey(t)
|
run(name+"_disablefighters", disableFighters, f)
|
||||||
var logger logger.Logf = logger.Discard
|
run(name+"_lastwriteractive", lastWriterIsActive, f)
|
||||||
const debug = false
|
|
||||||
if debug {
|
|
||||||
logger = t.Logf
|
|
||||||
}
|
}
|
||||||
|
wantSingleClient := func(t *testing.T, want *sclient) {
|
||||||
s := NewServer(serverPrivateKey, logger)
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
priv := newPrivateKey(t)
|
|
||||||
|
|
||||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer ln.Close()
|
|
||||||
|
|
||||||
connNum := 0
|
|
||||||
connect := func() *Client {
|
|
||||||
connNum++
|
|
||||||
cout, err := net.Dial("tcp", ln.Addr().String())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cin, err := ln.Accept()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin))
|
|
||||||
go s.Accept(cin, brwServer, fmt.Sprintf("test-client-%d", connNum))
|
|
||||||
|
|
||||||
brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout))
|
|
||||||
c, err := NewClient(priv, cout, brw, logger)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("client %d: %v", connNum, err)
|
|
||||||
}
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
wantVar := func(v *expvar.Int, want int64) {
|
|
||||||
t.Helper()
|
t.Helper()
|
||||||
if got := v.Value(); got != want {
|
switch s := s.clients[want.key].(type) {
|
||||||
t.Errorf("got %d; want %d", got, want)
|
case singleClient:
|
||||||
}
|
if s.c != want {
|
||||||
}
|
t.Error("wrong single client")
|
||||||
|
|
||||||
wantClosed := func(c *Client) {
|
|
||||||
t.Helper()
|
|
||||||
for {
|
|
||||||
m, err := c.Recv()
|
|
||||||
if err != nil {
|
|
||||||
t.Logf("got expected error: %v", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch m.(type) {
|
if want.isDup.Get() {
|
||||||
case ServerInfoMessage:
|
t.Errorf("unexpected isDup on singleClient")
|
||||||
continue
|
}
|
||||||
|
if want.isDisabled.Get() {
|
||||||
|
t.Errorf("unexpected isDisabled on singleClient")
|
||||||
|
}
|
||||||
|
case nil:
|
||||||
|
t.Error("no clients for key")
|
||||||
|
case *dupClientSet:
|
||||||
|
t.Error("unexpected multiple clients for key")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wantNoClient := func(t *testing.T) {
|
||||||
|
t.Helper()
|
||||||
|
switch s := s.clients[clientPub].(type) {
|
||||||
|
case nil:
|
||||||
|
// Good.
|
||||||
|
return
|
||||||
default:
|
default:
|
||||||
t.Fatalf("client got %T; wanted an error", m)
|
t.Errorf("got %T; want empty", s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
wantDupSet := func(t *testing.T) *dupClientSet {
|
||||||
|
t.Helper()
|
||||||
|
switch s := s.clients[clientPub].(type) {
|
||||||
|
case *dupClientSet:
|
||||||
|
return s
|
||||||
|
default:
|
||||||
|
t.Fatalf("wanted dup set; got %T", s)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wantActive := func(t *testing.T, want *sclient) {
|
||||||
|
t.Helper()
|
||||||
|
set, ok := s.clients[clientPub]
|
||||||
|
if !ok {
|
||||||
|
t.Error("no set for key")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
got := set.ActiveClient()
|
||||||
|
if got != want {
|
||||||
|
t.Errorf("active client = %q; want %q", clientName[got], clientName[want])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkDup := func(t *testing.T, c *sclient, want bool) {
|
||||||
|
t.Helper()
|
||||||
|
if got := c.isDup.Get(); got != want {
|
||||||
|
t.Errorf("client %q isDup = %v; want %v", clientName[c], got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
checkDisabled := func(t *testing.T, c *sclient, want bool) {
|
||||||
|
t.Helper()
|
||||||
|
if got := c.isDisabled.Get(); got != want {
|
||||||
|
t.Errorf("client %q isDisabled = %v; want %v", clientName[c], got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wantDupConns := func(t *testing.T, want int) {
|
||||||
|
t.Helper()
|
||||||
|
if got := s.dupClientConns.Value(); got != int64(want) {
|
||||||
|
t.Errorf("dupClientConns = %v; want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wantDupKeys := func(t *testing.T, want int) {
|
||||||
|
t.Helper()
|
||||||
|
if got := s.dupClientKeys.Value(); got != int64(want) {
|
||||||
|
t.Errorf("dupClientKeys = %v; want %v", got, want)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c1 := connect()
|
// Common case: a single client comes and goes, with no dups.
|
||||||
waitConnect(t, c1)
|
runBothWays("one_comes_and_goes", func(t *testing.T) {
|
||||||
c2 := connect()
|
wantNoClient(t)
|
||||||
waitConnect(t, c2)
|
s.registerClient(c1)
|
||||||
wantVar(&s.clientsReplaced, 1)
|
wantSingleClient(t, c1)
|
||||||
wantClosed(c1)
|
s.unregisterClient(c1)
|
||||||
|
wantNoClient(t)
|
||||||
|
})
|
||||||
|
|
||||||
for i := 0; i < 100+5; i++ {
|
// A still somewhat common case: a single client was
|
||||||
c := connect()
|
// connected and then their wifi dies or laptop closes
|
||||||
defer c.nc.Close()
|
// or they switch networks and connect from a
|
||||||
if s.clientsReplaceLimited.Value() == 0 && i < 90 {
|
// different network. They have two connections but
|
||||||
continue
|
// it's not very bad. Only their new one is
|
||||||
}
|
// active. The last one, being dead, doesn't send and
|
||||||
t.Logf("for %d: replaced=%d, limited=%d, sleeping=%d", i,
|
// thus the new one doesn't get disabled.
|
||||||
s.clientsReplaced.Value(),
|
runBothWays("small_overlap_replacement", func(t *testing.T) {
|
||||||
s.clientsReplaceLimited.Value(),
|
wantNoClient(t)
|
||||||
s.clientsReplaceSleeping.Value(),
|
s.registerClient(c1)
|
||||||
)
|
wantSingleClient(t, c1)
|
||||||
}
|
wantActive(t, c1)
|
||||||
|
wantDupKeys(t, 0)
|
||||||
|
wantDupKeys(t, 0)
|
||||||
|
|
||||||
mu.Lock()
|
s.registerClient(c2) // wifi dies; c2 replacement connects
|
||||||
defer mu.Unlock()
|
wantDupSet(t)
|
||||||
if sleeps == 0 {
|
wantDupConns(t, 2)
|
||||||
t.Errorf("no sleeps")
|
wantDupKeys(t, 1)
|
||||||
}
|
checkDup(t, c1, true)
|
||||||
if slept == 0 {
|
checkDup(t, c2, true)
|
||||||
t.Errorf("total sleep duration was 0")
|
checkDisabled(t, c1, false)
|
||||||
}
|
checkDisabled(t, c2, false)
|
||||||
|
wantActive(t, c2) // sends go to the replacement
|
||||||
|
|
||||||
|
s.unregisterClient(c1) // c1 finally times out
|
||||||
|
wantSingleClient(t, c2)
|
||||||
|
checkDup(t, c2, false) // c2 is longer a dup
|
||||||
|
wantActive(t, c2)
|
||||||
|
wantDupConns(t, 0)
|
||||||
|
wantDupKeys(t, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Key cloning situation with concurrent clients, both trying
|
||||||
|
// to write.
|
||||||
|
run("concurrent_dups_get_disabled", disableFighters, func(t *testing.T) {
|
||||||
|
wantNoClient(t)
|
||||||
|
s.registerClient(c1)
|
||||||
|
wantSingleClient(t, c1)
|
||||||
|
wantActive(t, c1)
|
||||||
|
s.registerClient(c2)
|
||||||
|
wantDupSet(t)
|
||||||
|
wantDupKeys(t, 1)
|
||||||
|
wantDupConns(t, 2)
|
||||||
|
wantActive(t, c2)
|
||||||
|
checkDup(t, c1, true)
|
||||||
|
checkDup(t, c2, true)
|
||||||
|
checkDisabled(t, c1, false)
|
||||||
|
checkDisabled(t, c2, false)
|
||||||
|
|
||||||
|
s.noteClientActivity(c2)
|
||||||
|
checkDisabled(t, c1, false)
|
||||||
|
checkDisabled(t, c2, false)
|
||||||
|
s.noteClientActivity(c1)
|
||||||
|
checkDisabled(t, c1, true)
|
||||||
|
checkDisabled(t, c2, true)
|
||||||
|
wantActive(t, nil)
|
||||||
|
|
||||||
|
s.registerClient(c3)
|
||||||
|
wantActive(t, c3)
|
||||||
|
checkDisabled(t, c3, false)
|
||||||
|
wantDupKeys(t, 1)
|
||||||
|
wantDupConns(t, 3)
|
||||||
|
|
||||||
|
s.unregisterClient(c3)
|
||||||
|
wantActive(t, nil)
|
||||||
|
wantDupKeys(t, 1)
|
||||||
|
wantDupConns(t, 2)
|
||||||
|
|
||||||
|
s.unregisterClient(c2)
|
||||||
|
wantSingleClient(t, c1)
|
||||||
|
wantDupKeys(t, 0)
|
||||||
|
wantDupConns(t, 0)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Key cloning with an A->B->C->A series instead.
|
||||||
|
run("concurrent_dups_three_parties", disableFighters, func(t *testing.T) {
|
||||||
|
wantNoClient(t)
|
||||||
|
s.registerClient(c1)
|
||||||
|
s.registerClient(c2)
|
||||||
|
s.registerClient(c3)
|
||||||
|
s.noteClientActivity(c1)
|
||||||
|
checkDisabled(t, c1, true)
|
||||||
|
checkDisabled(t, c2, true)
|
||||||
|
checkDisabled(t, c3, true)
|
||||||
|
wantActive(t, nil)
|
||||||
|
})
|
||||||
|
|
||||||
|
run("activity_promotes_primary_when_nil", disableFighters, func(t *testing.T) {
|
||||||
|
wantNoClient(t)
|
||||||
|
|
||||||
|
// Last registered client is the active one...
|
||||||
|
s.registerClient(c1)
|
||||||
|
wantActive(t, c1)
|
||||||
|
s.registerClient(c2)
|
||||||
|
wantActive(t, c2)
|
||||||
|
s.registerClient(c3)
|
||||||
|
s.noteClientActivity(c2)
|
||||||
|
wantActive(t, c3)
|
||||||
|
|
||||||
|
// But if the last one goes away, the one with the
|
||||||
|
// most recent activity wins.
|
||||||
|
s.unregisterClient(c3)
|
||||||
|
wantActive(t, c2)
|
||||||
|
})
|
||||||
|
|
||||||
|
run("concurrent_dups_three_parties_last_writer", lastWriterIsActive, func(t *testing.T) {
|
||||||
|
wantNoClient(t)
|
||||||
|
|
||||||
|
s.registerClient(c1)
|
||||||
|
wantActive(t, c1)
|
||||||
|
s.registerClient(c2)
|
||||||
|
wantActive(t, c2)
|
||||||
|
|
||||||
|
s.noteClientActivity(c1)
|
||||||
|
checkDisabled(t, c1, false)
|
||||||
|
checkDisabled(t, c2, false)
|
||||||
|
wantActive(t, c1)
|
||||||
|
|
||||||
|
s.noteClientActivity(c2)
|
||||||
|
checkDisabled(t, c1, false)
|
||||||
|
checkDisabled(t, c2, false)
|
||||||
|
wantActive(t, c2)
|
||||||
|
|
||||||
|
s.unregisterClient(c2)
|
||||||
|
checkDisabled(t, c1, false)
|
||||||
|
wantActive(t, c1)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLimiter(t *testing.T) {
|
func TestLimiter(t *testing.T) {
|
||||||
|
@ -18,11 +18,12 @@ func _() {
|
|||||||
_ = x[dropReasonQueueHead-3]
|
_ = x[dropReasonQueueHead-3]
|
||||||
_ = x[dropReasonQueueTail-4]
|
_ = x[dropReasonQueueTail-4]
|
||||||
_ = x[dropReasonWriteError-5]
|
_ = x[dropReasonWriteError-5]
|
||||||
|
_ = x[dropReasonDupClient-6]
|
||||||
}
|
}
|
||||||
|
|
||||||
const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteError"
|
const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClient"
|
||||||
|
|
||||||
var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59}
|
var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68}
|
||||||
|
|
||||||
func (i dropReason) String() string {
|
func (i dropReason) String() string {
|
||||||
if i < 0 || i >= dropReason(len(_dropReason_index)-1) {
|
if i < 0 || i >= dropReason(len(_dropReason_index)-1) {
|
||||||
|
Loading…
Reference in New Issue
Block a user