diff --git a/derp/derp_server.go b/derp/derp_server.go index ab0ab0a90..8066b7f19 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -84,11 +84,19 @@ func init() { } const ( - perClientSendQueueDepth = 32 // packets buffered for sending - writeTimeout = 2 * time.Second - privilegedWriteTimeout = 30 * time.Second // for clients with the mesh key + defaultPerClientSendQueueDepth = 32 // default packets buffered for sending + writeTimeout = 2 * time.Second + privilegedWriteTimeout = 30 * time.Second // for clients with the mesh key ) +func getPerClientSendQueueDepth() int { + if v, ok := envknob.LookupInt("TS_DEBUG_DERP_PER_CLIENT_SEND_QUEUE_DEPTH"); ok { + return v + } + + return defaultPerClientSendQueueDepth +} + // dupPolicy is a temporary (2021-08-30) mechanism to change the policy // of how duplicate connection for the same key are handled. type dupPolicy int8 @@ -190,6 +198,9 @@ type Server struct { // maps from netip.AddrPort to a client's public key keyOfAddr map[netip.AddrPort]key.NodePublic + // Sets the client send queue depth for the server. + perClientSendQueueDepth int + clock tstime.Clock } @@ -377,6 +388,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco") s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other") + + s.perClientSendQueueDepth = getPerClientSendQueueDepth() return s } @@ -849,8 +862,8 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem done: ctx.Done(), remoteIPPort: remoteIPPort, connectedAt: s.clock.Now(), - sendQueue: make(chan pkt, perClientSendQueueDepth), - discoSendQueue: make(chan pkt, perClientSendQueueDepth), + sendQueue: make(chan pkt, s.perClientSendQueueDepth), + discoSendQueue: make(chan pkt, s.perClientSendQueueDepth), sendPongCh: make(chan [8]byte, 1), peerGone: make(chan peerGoneMsg), canMesh: s.isMeshPeer(clientInfo), diff --git a/derp/derp_test.go b/derp/derp_test.go index 9185194dd..f0fc52fe7 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -6,6 +6,7 @@ package derp import ( "bufio" "bytes" + "cmp" "context" "crypto/x509" "encoding/asn1" @@ -23,6 +24,7 @@ import ( "testing" "time" + qt "github.com/frankban/quicktest" "go4.org/mem" "golang.org/x/time/rate" "tailscale.com/disco" @@ -1598,3 +1600,29 @@ func TestServerRepliesToPing(t *testing.T) { } } } + +func TestGetPerClientSendQueueDepth(t *testing.T) { + c := qt.New(t) + envKey := "TS_DEBUG_DERP_PER_CLIENT_SEND_QUEUE_DEPTH" + + testCases := []struct { + envVal string + want int + }{ + // Empty case, envknob treats empty as missing also. + { + "", defaultPerClientSendQueueDepth, + }, + { + "64", 64, + }, + } + + for _, tc := range testCases { + t.Run(cmp.Or(tc.envVal, "empty"), func(t *testing.T) { + t.Setenv(envKey, tc.envVal) + val := getPerClientSendQueueDepth() + c.Assert(val, qt.Equals, tc.want) + }) + } +}