derp: accept dup clients without closing prior's connection

A public key should only have max one connection to a given
DERP node (or really: one connection to a node in a region).

But if people clone their machine keys (e.g. clone their VM, Raspbery
Pi SD card, etc), then we can get into a situation where a public key
is connected multiple times.

Originally, the DERP server handled this by just kicking out a prior
connections whenever a new one came. But this led to reconnect fights
where 2+ nodes were in hard loops trying to reconnect and kicking out
their peer.

Then a909d37a59f6e36f47209db4e6b16497715f8de9 tried to add rate
limiting to how often that dup-kicking can happen, but empirically it
just doesn't work and ~leaks a bunch of goroutines and TCP
connections, tying them up for hour+ while more and more accumulate
and waste memory. Mostly because we were doing a time.Sleep forever
while not reading from their TCP connections.

Instead, just accept multiple connections per public key but track
which is the most recent. And if two both are writing back & forth,
then optionally disable them both. That last part is only enabled in
tests for now. The current default policy is just last-sender-wins
while we gather the next round of stats.

Updates #2751

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
This commit is contained in:
Brad Fitzpatrick 2021-08-30 11:16:11 -07:00 committed by Brad Fitzpatrick
parent debaaebf3b
commit 73280595a8
3 changed files with 532 additions and 179 deletions

View File

@ -41,6 +41,7 @@
"tailscale.com/client/tailscale" "tailscale.com/client/tailscale"
"tailscale.com/disco" "tailscale.com/disco"
"tailscale.com/metrics" "tailscale.com/metrics"
"tailscale.com/syncs"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/pad32" "tailscale.com/types/pad32"
@ -77,6 +78,21 @@ func init() {
writeTimeout = 2 * time.Second writeTimeout = 2 * time.Second
) )
// dupPolicy is a temporary (2021-08-30) mechanism to change the policy
// of how duplicate connection for the same key are handled.
type dupPolicy int8
const (
// lastWriterIsActive is a dupPolicy where the connection
// to send traffic for a peer is the active one.
lastWriterIsActive dupPolicy = iota
// disableFighters is a dupPolicy that detects if peers
// are trying to send interleaved with each other and
// then disables all of them.
disableFighters
)
// Server is a DERP server. // Server is a DERP server.
type Server struct { type Server struct {
// WriteTimeout, if non-zero, specifies how long to wait // WriteTimeout, if non-zero, specifies how long to wait
@ -90,9 +106,9 @@ type Server struct {
meshKey string meshKey string
limitedLogf logger.Logf limitedLogf logger.Logf
metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
dupPolicy dupPolicy
// Counters: // Counters:
_ pad32.Four
packetsSent, bytesSent expvar.Int packetsSent, bytesSent expvar.Int
packetsRecv, bytesRecv expvar.Int packetsRecv, bytesRecv expvar.Int
packetsRecvByKind metrics.LabelMap packetsRecvByKind metrics.LabelMap
@ -112,9 +128,9 @@ type Server struct {
accepts expvar.Int accepts expvar.Int
curClients expvar.Int curClients expvar.Int
curHomeClients expvar.Int // ones with preferred curHomeClients expvar.Int // ones with preferred
clientsReplaced expvar.Int dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
clientsReplaceLimited expvar.Int dupClientConns expvar.Int // current number of connections sharing a public key
clientsReplaceSleeping expvar.Int dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
unknownFrames expvar.Int unknownFrames expvar.Int
homeMovesIn expvar.Int // established clients announce home server moves in homeMovesIn expvar.Int // established clients announce home server moves in
homeMovesOut expvar.Int // established clients announce home server moves out homeMovesOut expvar.Int // established clients announce home server moves out
@ -130,7 +146,7 @@ type Server struct {
mu sync.Mutex mu sync.Mutex
closed bool closed bool
netConns map[Conn]chan struct{} // chan is closed when conn closes netConns map[Conn]chan struct{} // chan is closed when conn closes
clients map[key.Public]*sclient clients map[key.Public]clientSet
watchers map[*sclient]bool // mesh peer -> true watchers map[*sclient]bool // mesh peer -> true
// clientsMesh tracks all clients in the cluster, both locally // clientsMesh tracks all clients in the cluster, both locally
// and to mesh peers. If the value is nil, that means the // and to mesh peers. If the value is nil, that means the
@ -148,6 +164,112 @@ type Server struct {
keyOfAddr map[netaddr.IPPort]key.Public keyOfAddr map[netaddr.IPPort]key.Public
} }
// clientSet represents 1 or more *sclients.
//
// The two implementations are singleClient and *dupClientSet.
//
// In the common case, client should only have one connection to the
// DERP server for a given key. When they're connected multiple times,
// we record their set of connections in dupClientSet and keep their
// connections open to make them happy (to keep them from spinning,
// etc) and keep track of which is the latest connection. If only the last
// is sending traffic, that last one is the active connection and it
// gets traffic. Otherwise, in the case of a cloned node key, the
// whole set of dups doesn't receive data frames.
//
// All methods should only be called while holding Server.mu.
//
// TODO(bradfitz): Issue 2746: in the future we'll send some sort of
// "health_error" frame to them that'll communicate to the end users
// that they cloned a device key, and we'll also surface it in the
// admin panel, etc.
type clientSet interface {
// ActiveClient returns the most recently added client to
// the set, as long as it hasn't been disabled, in which
// case it returns nil.
ActiveClient() *sclient
// Len returns the number of clients in the set.
Len() int
// ForeachClient calls f for each client in the set.
ForeachClient(f func(*sclient))
}
// singleClient is a clientSet of a single connection.
// This is the common case.
type singleClient struct{ c *sclient }
func (s singleClient) ActiveClient() *sclient { return s.c }
func (s singleClient) Len() int { return 1 }
func (s singleClient) ForeachClient(f func(*sclient)) { f(s.c) }
// A dupClientSet is a clientSet of more than 1 connection.
//
// This can occur in some reasonable cases (temporarily while users
// are changing networks) or in the case of a cloned key. In the
// cloned key case, both peers are speaking and the clients get
// disabled.
//
// All fields are guarded by Server.mu.
type dupClientSet struct {
// set is the set of connected clients for sclient.key.
// The values are all true.
set map[*sclient]bool
// last is the most recent addition to set, or nil if the most
// recent one has since disconnected and nobody else has send
// data since.
last *sclient
// sendHistory is a log of which members of set have sent
// frames to the derp server, with adjacent duplicates
// removed. When a member of set is removed, the same
// element(s) are removed from sendHistory.
sendHistory []*sclient
}
func (s *dupClientSet) ActiveClient() *sclient {
if s.last != nil && !s.last.isDisabled.Get() {
return s.last
}
return nil
}
func (s *dupClientSet) Len() int { return len(s.set) }
func (s *dupClientSet) ForeachClient(f func(*sclient)) {
for c := range s.set {
f(c)
}
}
// removeClient removes c from s and reports whether it was in s
// to begin with.
func (s *dupClientSet) removeClient(c *sclient) bool {
n := len(s.set)
delete(s.set, c)
if s.last == c {
s.last = nil
}
if len(s.set) == n {
return false
}
trim := s.sendHistory[:0]
for _, v := range s.sendHistory {
if s.set[v] && (len(trim) == 0 || trim[len(trim)-1] != v) {
trim = append(trim, v)
}
}
for i := len(trim); i < len(s.sendHistory); i++ {
s.sendHistory[i] = nil
}
s.sendHistory = trim
if s.last == nil && len(s.sendHistory) > 0 {
s.last = s.sendHistory[len(s.sendHistory)-1]
}
return true
}
// PacketForwarder is something that can forward packets. // PacketForwarder is something that can forward packets.
// //
// It's mostly an interface for circular dependency reasons; the // It's mostly an interface for circular dependency reasons; the
@ -184,7 +306,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
packetsRecvByKind: metrics.LabelMap{Label: "kind"}, packetsRecvByKind: metrics.LabelMap{Label: "kind"},
packetsDroppedReason: metrics.LabelMap{Label: "reason"}, packetsDroppedReason: metrics.LabelMap{Label: "reason"},
packetsDroppedType: metrics.LabelMap{Label: "type"}, packetsDroppedType: metrics.LabelMap{Label: "type"},
clients: map[key.Public]*sclient{}, clients: map[key.Public]clientSet{},
clientsMesh: map[key.Public]PacketForwarder{}, clientsMesh: map[key.Public]PacketForwarder{},
netConns: map[Conn]chan struct{}{}, netConns: map[Conn]chan struct{}{},
memSys0: ms.Sys, memSys0: ms.Sys,
@ -344,39 +466,48 @@ func (s *Server) MetaCert() []byte { return s.metaCert }
// registerClient notes that client c is now authenticated and ready for packets. // registerClient notes that client c is now authenticated and ready for packets.
// //
// If c's public key was already connected with a different // If c.key is connected more than once, the earlier connection(s) are
// connection, the prior one is closed, unless it's fighting rapidly // placed in a non-active state where we read from them (primarily to
// with another client with the same key, in which case the returned // observe EOFs/timeouts) but won't send them frames on the assumption
// ok is false, and the caller should wait the provided duration // that they're dead.
// before trying again. func (s *Server) registerClient(c *sclient) {
func (s *Server) registerClient(c *sclient) (ok bool, d time.Duration) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
old := s.clients[c.key]
if old == nil { set := s.clients[c.key]
c.logf("adding connection") switch set := set.(type) {
} else { case nil:
// Take over the old rate limiter, discarding the one s.clients[c.key] = singleClient{c}
// our caller just made. case singleClient:
c.replaceLimiter = old.replaceLimiter s.dupClientKeys.Add(1)
if rr := c.replaceLimiter.ReserveN(timeNow(), 1); rr.OK() { s.dupClientConns.Add(2) // both old and new count
if d := rr.DelayFrom(timeNow()); d > 0 { s.dupClientConnTotal.Add(1)
s.clientsReplaceLimited.Add(1) old := set.ActiveClient()
return false, d old.isDup.Set(true)
} c.isDup.Set(true)
s.clients[c.key] = &dupClientSet{
last: c,
set: map[*sclient]bool{
old: true,
c: true,
},
sendHistory: []*sclient{old},
} }
s.clientsReplaced.Add(1) case *dupClientSet:
c.logf("adding connection, replacing %s", old.remoteAddr) s.dupClientConns.Add(1) // the gauge
go old.nc.Close() s.dupClientConnTotal.Add(1) // the counter
c.isDup.Set(true)
set.set[c] = true
set.last = c
set.sendHistory = append(set.sendHistory, c)
} }
s.clients[c.key] = c
if _, ok := s.clientsMesh[c.key]; !ok { if _, ok := s.clientsMesh[c.key]; !ok {
s.clientsMesh[c.key] = nil // just for varz of total users in cluster s.clientsMesh[c.key] = nil // just for varz of total users in cluster
} }
s.keyOfAddr[c.remoteIPPort] = c.key s.keyOfAddr[c.remoteIPPort] = c.key
s.curClients.Add(1) s.curClients.Add(1)
s.broadcastPeerStateChangeLocked(c.key, true) s.broadcastPeerStateChangeLocked(c.key, true)
return true, 0
} }
// broadcastPeerStateChangeLocked enqueues a message to all watchers // broadcastPeerStateChangeLocked enqueues a message to all watchers
@ -395,8 +526,12 @@ func (s *Server) broadcastPeerStateChangeLocked(peer key.Public, present bool) {
func (s *Server) unregisterClient(c *sclient) { func (s *Server) unregisterClient(c *sclient) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
cur := s.clients[c.key]
if cur == c { set := s.clients[c.key]
switch set := set.(type) {
case nil:
c.logf("[unexpected]; clients map is empty")
case singleClient:
c.logf("removing connection") c.logf("removing connection")
delete(s.clients, c.key) delete(s.clients, c.key)
if v, ok := s.clientsMesh[c.key]; ok && v == nil { if v, ok := s.clientsMesh[c.key]; ok && v == nil {
@ -404,7 +539,28 @@ func (s *Server) unregisterClient(c *sclient) {
s.notePeerGoneFromRegionLocked(c.key) s.notePeerGoneFromRegionLocked(c.key)
} }
s.broadcastPeerStateChangeLocked(c.key, false) s.broadcastPeerStateChangeLocked(c.key, false)
case *dupClientSet:
if set.removeClient(c) {
s.dupClientConns.Add(-1)
} else {
c.logf("[unexpected]; dup client set didn't shrink")
}
if set.Len() == 1 {
s.dupClientConns.Add(-1) // again; for the original one's
s.dupClientKeys.Add(-1)
var remain *sclient
for remain = range set.set {
break
}
if remain == nil {
panic("unexpected nil remain from single element dup set")
}
remain.isDisabled.Set(false)
remain.isDup.Set(false)
s.clients[c.key] = singleClient{remain}
}
} }
if c.canMesh { if c.canMesh {
delete(s.watchers, c) delete(s.watchers, c)
} }
@ -431,9 +587,15 @@ func (s *Server) notePeerGoneFromRegionLocked(key key.Public) {
// or move them over to the active client (in case a replaced client // or move them over to the active client (in case a replaced client
// connection is being unregistered). // connection is being unregistered).
for pubKey, connNum := range s.sentTo[key] { for pubKey, connNum := range s.sentTo[key] {
if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum { set, ok := s.clients[pubKey]
go peer.requestPeerGoneWrite(key) if !ok {
continue
} }
set.ForeachClient(func(peer *sclient) {
if peer.connNum == connNum {
go peer.requestPeerGoneWrite(key)
}
})
} }
delete(s.sentTo, key) delete(s.sentTo, key)
} }
@ -503,12 +665,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
discoSendQueue: make(chan pkt, perClientSendQueueDepth), discoSendQueue: make(chan pkt, perClientSendQueueDepth),
peerGone: make(chan key.Public), peerGone: make(chan key.Public),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
// Allow kicking out previous connections once a
// minute, with a very high burst of 100. Once a
// minute is less than the client's 2 minute
// inactivity timeout.
replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100),
} }
if c.canMesh { if c.canMesh {
@ -518,15 +674,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
c.info = *clientInfo c.info = *clientInfo
} }
for { s.registerClient(c)
ok, d := s.registerClient(c)
if ok {
break
}
s.clientsReplaceSleeping.Add(1)
timeSleep(d)
s.clientsReplaceSleeping.Add(-1)
}
defer s.unregisterClient(c) defer s.unregisterClient(c)
err = s.sendServerInfo(c.bw, clientKey) err = s.sendServerInfo(c.bw, clientKey)
@ -570,6 +718,7 @@ func (c *sclient) run(ctx context.Context) error {
} }
return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err) return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err)
} }
c.s.noteClientActivity(c)
switch ft { switch ft {
case frameNotePreferred: case frameNotePreferred:
err = c.handleFrameNotePreferred(ft, fl) err = c.handleFrameNotePreferred(ft, fl)
@ -634,9 +783,15 @@ func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if target, ok := s.clients[targetKey]; ok { if set, ok := s.clients[targetKey]; ok {
c.logf("frameClosePeer closing peer %x", targetKey) if set.Len() == 1 {
go target.nc.Close() c.logf("frameClosePeer closing peer %x", targetKey)
} else {
c.logf("frameClosePeer closing peer %x (%d connections)", targetKey, set.Len())
}
set.ForeachClient(func(target *sclient) {
go target.nc.Close()
})
} else { } else {
c.logf("frameClosePeer failed to find peer %x", targetKey) c.logf("frameClosePeer failed to find peer %x", targetKey)
} }
@ -658,15 +813,25 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
} }
s.packetsForwardedIn.Add(1) s.packetsForwardedIn.Add(1)
var dstLen int
var dst *sclient
s.mu.Lock() s.mu.Lock()
dst := s.clients[dstKey] if set, ok := s.clients[dstKey]; ok {
dstLen = set.Len()
dst = set.ActiveClient()
}
if dst != nil { if dst != nil {
s.notePeerSendLocked(srcKey, dst) s.notePeerSendLocked(srcKey, dst)
} }
s.mu.Unlock() s.mu.Unlock()
if dst == nil { if dst == nil {
s.recordDrop(contents, srcKey, dstKey, dropReasonUnknownDestOnFwd) reason := dropReasonUnknownDestOnFwd
if dstLen > 1 {
reason = dropReasonDupClient
}
s.recordDrop(contents, srcKey, dstKey, reason)
return nil return nil
} }
@ -699,12 +864,18 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
} }
var fwd PacketForwarder var fwd PacketForwarder
var dstLen int
var dst *sclient
s.mu.Lock() s.mu.Lock()
dst := s.clients[dstKey] if set, ok := s.clients[dstKey]; ok {
if dst == nil { dstLen = set.Len()
fwd = s.clientsMesh[dstKey] dst = set.ActiveClient()
} else { }
if dst != nil {
s.notePeerSendLocked(c.key, dst) s.notePeerSendLocked(c.key, dst)
} else if dstLen < 1 {
fwd = s.clientsMesh[dstKey]
} }
s.mu.Unlock() s.mu.Unlock()
@ -717,7 +888,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
} }
return nil return nil
} }
s.recordDrop(contents, c.key, dstKey, dropReasonUnknownDest) reason := dropReasonUnknownDest
if dstLen > 1 {
reason = dropReasonDupClient
}
s.recordDrop(contents, c.key, dstKey, reason)
return nil return nil
} }
@ -741,6 +916,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
dropReasonQueueHead // destination queue is full, dropped packet at queue head dropReasonQueueHead // destination queue is full, dropped packet at queue head
dropReasonQueueTail // destination queue is full, dropped packet at queue tail dropReasonQueueTail // destination queue is full, dropped packet at queue tail
dropReasonWriteError // OS write() failed dropReasonWriteError // OS write() failed
dropReasonDupClient // the public key is connected 2+ times (active/active, fighting)
) )
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.Public, reason dropReason) { func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.Public, reason dropReason) {
@ -850,6 +1026,57 @@ func (s *Server) sendServerKey(lw *lazyBufioWriter) error {
return err return err
} }
func (s *Server) noteClientActivity(c *sclient) {
if !c.isDup.Get() {
// Fast path for clients that aren't in a dup set.
return
}
if c.isDisabled.Get() {
// If they're already disabled, no point checking more.
return
}
s.mu.Lock()
defer s.mu.Unlock()
ds, ok := s.clients[c.key].(*dupClientSet)
if !ok {
// It became unduped in between the isDup fast path check above
// and the mutex check. Nothing to do.
return
}
if s.dupPolicy == lastWriterIsActive {
ds.last = c
} else if ds.last == nil {
// If we didn't have a primary, let the current
// speaker be the primary.
ds.last = c
}
if sh := ds.sendHistory; len(sh) != 0 && sh[len(sh)-1] == c {
// The client c was the last client to make activity
// in this set and it was already recorded. Nothing to
// do.
return
}
// If we saw this connection send previously, then consider
// the group fighting and disable them all.
if s.dupPolicy == disableFighters {
for _, prior := range ds.sendHistory {
if prior == c {
ds.ForeachClient(func(c *sclient) {
c.isDisabled.Set(true)
})
break
}
}
}
// Append this client to the list of clients who spoke last.
ds.sendHistory = append(ds.sendHistory, c)
}
type serverInfo struct { type serverInfo struct {
Version int `json:"version,omitempty"` Version int `json:"version,omitempty"`
} }
@ -979,14 +1206,16 @@ type sclient struct {
key key.Public key key.Public
info clientInfo info clientInfo
logf logger.Logf logf logger.Logf
done <-chan struct{} // closed when connection closes done <-chan struct{} // closed when connection closes
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port. remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port.
sendQueue chan pkt // packets queued to this client; never closed sendQueue chan pkt // packets queued to this client; never closed
discoSendQueue chan pkt // important packets queued to this client; never closed discoSendQueue chan pkt // important packets queued to this client; never closed
peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers)
meshUpdate chan struct{} // write request to write peerStateChange meshUpdate chan struct{} // write request to write peerStateChange
canMesh bool // clientInfo had correct mesh token for inter-region routing canMesh bool // clientInfo had correct mesh token for inter-region routing
isDup syncs.AtomicBool // whether more than 1 sclient for key is connected
isDisabled syncs.AtomicBool // whether sends to this peer are disabled due to active/active dups
// replaceLimiter controls how quickly two connections with // replaceLimiter controls how quickly two connections with
// the same client key can kick each other off the server by // the same client key can kick each other off the server by
@ -1399,10 +1628,10 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("gauge_clients_total", expvar.Func(func() interface{} { return len(s.clientsMesh) })) m.Set("gauge_clients_total", expvar.Func(func() interface{} { return len(s.clientsMesh) }))
m.Set("gauge_clients_local", expvar.Func(func() interface{} { return len(s.clients) })) m.Set("gauge_clients_local", expvar.Func(func() interface{} { return len(s.clients) }))
m.Set("gauge_clients_remote", expvar.Func(func() interface{} { return len(s.clientsMesh) - len(s.clients) })) m.Set("gauge_clients_remote", expvar.Func(func() interface{} { return len(s.clientsMesh) - len(s.clients) }))
m.Set("gauge_current_dup_client_keys", &s.dupClientKeys)
m.Set("gauge_current_dup_client_conns", &s.dupClientConns)
m.Set("counter_total_dup_client_conns", &s.dupClientConnTotal)
m.Set("accepts", &s.accepts) m.Set("accepts", &s.accepts)
m.Set("clients_replaced", &s.clientsReplaced)
m.Set("clients_replace_limited", &s.clientsReplaceLimited)
m.Set("gauge_clients_replace_sleeping", &s.clientsReplaceSleeping)
m.Set("bytes_received", &s.bytesRecv) m.Set("bytes_received", &s.bytesRecv)
m.Set("bytes_sent", &s.bytesSent) m.Set("bytes_sent", &s.bytesSent)
m.Set("packets_dropped", &s.packetsDropped) m.Set("packets_dropped", &s.packetsDropped)

View File

@ -662,7 +662,7 @@ func pubAll(b byte) (ret key.Public) {
func TestForwarderRegistration(t *testing.T) { func TestForwarderRegistration(t *testing.T) {
s := &Server{ s := &Server{
clients: make(map[key.Public]*sclient), clients: make(map[key.Public]clientSet),
clientsMesh: map[key.Public]PacketForwarder{}, clientsMesh: map[key.Public]PacketForwarder{},
} }
want := func(want map[key.Public]PacketForwarder) { want := func(want map[key.Public]PacketForwarder) {
@ -745,7 +745,7 @@ func TestForwarderRegistration(t *testing.T) {
key: u1, key: u1,
logf: logger.Discard, logf: logger.Discard,
} }
s.clients[u1] = u1c s.clients[u1] = singleClient{u1c}
s.RemovePacketForwarder(u1, testFwd(100)) s.RemovePacketForwarder(u1, testFwd(100))
want(map[key.Public]PacketForwarder{ want(map[key.Public]PacketForwarder{
u1: nil, u1: nil,
@ -765,7 +765,7 @@ func TestForwarderRegistration(t *testing.T) {
// Now pretend u1 was already connected locally (so clientsMesh[u1] is nil), and then we heard // Now pretend u1 was already connected locally (so clientsMesh[u1] is nil), and then we heard
// that they're also connected to a peer of ours. That sholdn't transition the forwarder // that they're also connected to a peer of ours. That sholdn't transition the forwarder
// from nil to the new one, not a multiForwarder. // from nil to the new one, not a multiForwarder.
s.clients[u1] = u1c s.clients[u1] = singleClient{u1c}
s.clientsMesh[u1] = nil s.clientsMesh[u1] = nil
want(map[key.Public]PacketForwarder{ want(map[key.Public]PacketForwarder{
u1: nil, u1: nil,
@ -851,125 +851,248 @@ func TestClientSendPong(t *testing.T) {
} }
func TestServerReplaceClients(t *testing.T) { func TestServerDupClients(t *testing.T) {
defer func() { serverPriv := newPrivateKey(t)
timeSleep = time.Sleep var s *Server
timeNow = time.Now
}()
var ( clientPriv := newPrivateKey(t)
mu sync.Mutex clientPub := clientPriv.Public()
now = time.Unix(123, 0)
sleeps int
slept time.Duration
)
timeSleep = func(d time.Duration) {
mu.Lock()
defer mu.Unlock()
sleeps++
slept += d
now = now.Add(d)
}
timeNow = func() time.Time {
mu.Lock()
defer mu.Unlock()
return now
}
serverPrivateKey := newPrivateKey(t) var c1, c2, c3 *sclient
var logger logger.Logf = logger.Discard var clientName map[*sclient]string
const debug = false
if debug {
logger = t.Logf
}
s := NewServer(serverPrivateKey, logger) // run starts a new test case and resets clients back to their zero values.
defer s.Close() run := func(name string, dupPolicy dupPolicy, f func(t *testing.T)) {
s = NewServer(serverPriv, t.Logf)
priv := newPrivateKey(t) s.dupPolicy = dupPolicy
c1 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c1: ")}
ln, err := net.Listen("tcp", "127.0.0.1:0") c2 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c2: ")}
if err != nil { c3 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c3: ")}
t.Fatal(err) clientName = map[*sclient]string{
} c1: "c1",
defer ln.Close() c2: "c2",
c3: "c3",
connNum := 0
connect := func() *Client {
connNum++
cout, err := net.Dial("tcp", ln.Addr().String())
if err != nil {
t.Fatal(err)
} }
t.Run(name, f)
cin, err := ln.Accept()
if err != nil {
t.Fatal(err)
}
brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin))
go s.Accept(cin, brwServer, fmt.Sprintf("test-client-%d", connNum))
brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout))
c, err := NewClient(priv, cout, brw, logger)
if err != nil {
t.Fatalf("client %d: %v", connNum, err)
}
return c
} }
runBothWays := func(name string, f func(t *testing.T)) {
wantVar := func(v *expvar.Int, want int64) { run(name+"_disablefighters", disableFighters, f)
run(name+"_lastwriteractive", lastWriterIsActive, f)
}
wantSingleClient := func(t *testing.T, want *sclient) {
t.Helper() t.Helper()
if got := v.Value(); got != want { switch s := s.clients[want.key].(type) {
t.Errorf("got %d; want %d", got, want) case singleClient:
} if s.c != want {
} t.Error("wrong single client")
wantClosed := func(c *Client) {
t.Helper()
for {
m, err := c.Recv()
if err != nil {
t.Logf("got expected error: %v", err)
return return
} }
switch m.(type) { if want.isDup.Get() {
case ServerInfoMessage: t.Errorf("unexpected isDup on singleClient")
continue
default:
t.Fatalf("client got %T; wanted an error", m)
} }
if want.isDisabled.Get() {
t.Errorf("unexpected isDisabled on singleClient")
}
case nil:
t.Error("no clients for key")
case *dupClientSet:
t.Error("unexpected multiple clients for key")
}
}
wantNoClient := func(t *testing.T) {
t.Helper()
switch s := s.clients[clientPub].(type) {
case nil:
// Good.
return
default:
t.Errorf("got %T; want empty", s)
}
}
wantDupSet := func(t *testing.T) *dupClientSet {
t.Helper()
switch s := s.clients[clientPub].(type) {
case *dupClientSet:
return s
default:
t.Fatalf("wanted dup set; got %T", s)
return nil
}
}
wantActive := func(t *testing.T, want *sclient) {
t.Helper()
set, ok := s.clients[clientPub]
if !ok {
t.Error("no set for key")
return
}
got := set.ActiveClient()
if got != want {
t.Errorf("active client = %q; want %q", clientName[got], clientName[want])
}
}
checkDup := func(t *testing.T, c *sclient, want bool) {
t.Helper()
if got := c.isDup.Get(); got != want {
t.Errorf("client %q isDup = %v; want %v", clientName[c], got, want)
}
}
checkDisabled := func(t *testing.T, c *sclient, want bool) {
t.Helper()
if got := c.isDisabled.Get(); got != want {
t.Errorf("client %q isDisabled = %v; want %v", clientName[c], got, want)
}
}
wantDupConns := func(t *testing.T, want int) {
t.Helper()
if got := s.dupClientConns.Value(); got != int64(want) {
t.Errorf("dupClientConns = %v; want %v", got, want)
}
}
wantDupKeys := func(t *testing.T, want int) {
t.Helper()
if got := s.dupClientKeys.Value(); got != int64(want) {
t.Errorf("dupClientKeys = %v; want %v", got, want)
} }
} }
c1 := connect() // Common case: a single client comes and goes, with no dups.
waitConnect(t, c1) runBothWays("one_comes_and_goes", func(t *testing.T) {
c2 := connect() wantNoClient(t)
waitConnect(t, c2) s.registerClient(c1)
wantVar(&s.clientsReplaced, 1) wantSingleClient(t, c1)
wantClosed(c1) s.unregisterClient(c1)
wantNoClient(t)
})
for i := 0; i < 100+5; i++ { // A still somewhat common case: a single client was
c := connect() // connected and then their wifi dies or laptop closes
defer c.nc.Close() // or they switch networks and connect from a
if s.clientsReplaceLimited.Value() == 0 && i < 90 { // different network. They have two connections but
continue // it's not very bad. Only their new one is
} // active. The last one, being dead, doesn't send and
t.Logf("for %d: replaced=%d, limited=%d, sleeping=%d", i, // thus the new one doesn't get disabled.
s.clientsReplaced.Value(), runBothWays("small_overlap_replacement", func(t *testing.T) {
s.clientsReplaceLimited.Value(), wantNoClient(t)
s.clientsReplaceSleeping.Value(), s.registerClient(c1)
) wantSingleClient(t, c1)
} wantActive(t, c1)
wantDupKeys(t, 0)
wantDupKeys(t, 0)
mu.Lock() s.registerClient(c2) // wifi dies; c2 replacement connects
defer mu.Unlock() wantDupSet(t)
if sleeps == 0 { wantDupConns(t, 2)
t.Errorf("no sleeps") wantDupKeys(t, 1)
} checkDup(t, c1, true)
if slept == 0 { checkDup(t, c2, true)
t.Errorf("total sleep duration was 0") checkDisabled(t, c1, false)
} checkDisabled(t, c2, false)
wantActive(t, c2) // sends go to the replacement
s.unregisterClient(c1) // c1 finally times out
wantSingleClient(t, c2)
checkDup(t, c2, false) // c2 is longer a dup
wantActive(t, c2)
wantDupConns(t, 0)
wantDupKeys(t, 0)
})
// Key cloning situation with concurrent clients, both trying
// to write.
run("concurrent_dups_get_disabled", disableFighters, func(t *testing.T) {
wantNoClient(t)
s.registerClient(c1)
wantSingleClient(t, c1)
wantActive(t, c1)
s.registerClient(c2)
wantDupSet(t)
wantDupKeys(t, 1)
wantDupConns(t, 2)
wantActive(t, c2)
checkDup(t, c1, true)
checkDup(t, c2, true)
checkDisabled(t, c1, false)
checkDisabled(t, c2, false)
s.noteClientActivity(c2)
checkDisabled(t, c1, false)
checkDisabled(t, c2, false)
s.noteClientActivity(c1)
checkDisabled(t, c1, true)
checkDisabled(t, c2, true)
wantActive(t, nil)
s.registerClient(c3)
wantActive(t, c3)
checkDisabled(t, c3, false)
wantDupKeys(t, 1)
wantDupConns(t, 3)
s.unregisterClient(c3)
wantActive(t, nil)
wantDupKeys(t, 1)
wantDupConns(t, 2)
s.unregisterClient(c2)
wantSingleClient(t, c1)
wantDupKeys(t, 0)
wantDupConns(t, 0)
})
// Key cloning with an A->B->C->A series instead.
run("concurrent_dups_three_parties", disableFighters, func(t *testing.T) {
wantNoClient(t)
s.registerClient(c1)
s.registerClient(c2)
s.registerClient(c3)
s.noteClientActivity(c1)
checkDisabled(t, c1, true)
checkDisabled(t, c2, true)
checkDisabled(t, c3, true)
wantActive(t, nil)
})
run("activity_promotes_primary_when_nil", disableFighters, func(t *testing.T) {
wantNoClient(t)
// Last registered client is the active one...
s.registerClient(c1)
wantActive(t, c1)
s.registerClient(c2)
wantActive(t, c2)
s.registerClient(c3)
s.noteClientActivity(c2)
wantActive(t, c3)
// But if the last one goes away, the one with the
// most recent activity wins.
s.unregisterClient(c3)
wantActive(t, c2)
})
run("concurrent_dups_three_parties_last_writer", lastWriterIsActive, func(t *testing.T) {
wantNoClient(t)
s.registerClient(c1)
wantActive(t, c1)
s.registerClient(c2)
wantActive(t, c2)
s.noteClientActivity(c1)
checkDisabled(t, c1, false)
checkDisabled(t, c2, false)
wantActive(t, c1)
s.noteClientActivity(c2)
checkDisabled(t, c1, false)
checkDisabled(t, c2, false)
wantActive(t, c2)
s.unregisterClient(c2)
checkDisabled(t, c1, false)
wantActive(t, c1)
})
} }
func TestLimiter(t *testing.T) { func TestLimiter(t *testing.T) {

View File

@ -18,11 +18,12 @@ func _() {
_ = x[dropReasonQueueHead-3] _ = x[dropReasonQueueHead-3]
_ = x[dropReasonQueueTail-4] _ = x[dropReasonQueueTail-4]
_ = x[dropReasonWriteError-5] _ = x[dropReasonWriteError-5]
_ = x[dropReasonDupClient-6]
} }
const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteError" const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClient"
var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59} var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68}
func (i dropReason) String() string { func (i dropReason) String() string {
if i < 0 || i >= dropReason(len(_dropReason_index)-1) { if i < 0 || i >= dropReason(len(_dropReason_index)-1) {