From 90a7d3066c17463162ad66f817bf0d54900aef3e Mon Sep 17 00:00:00 2001 From: Claire Wang Date: Thu, 27 Jul 2023 15:56:33 -0400 Subject: [PATCH] derp: use tstime (#8634) Updates #8587 Signed-off-by: Claire Wang --- cmd/derper/depaware.txt | 1 + cmd/tailscale/depaware.txt | 1 + derp/derp_client.go | 9 ++++++--- derp/derp_server.go | 24 ++++++++++++++---------- derp/derp_server_linux.go | 4 ++-- derp/derp_test.go | 11 +++++++---- derp/derphttp/derphttp_client.go | 10 +++++++--- derp/derphttp/mesh_client.go | 10 +++++----- 8 files changed, 43 insertions(+), 27 deletions(-) diff --git a/cmd/derper/depaware.txt b/cmd/derper/depaware.txt index c99cdb652..7dad61739 100644 --- a/cmd/derper/depaware.txt +++ b/cmd/derper/depaware.txt @@ -115,6 +115,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa tailscale.com/tailcfg from tailscale.com/client/tailscale+ tailscale.com/tka from tailscale.com/client/tailscale+ W tailscale.com/tsconst from tailscale.com/net/interfaces + tailscale.com/tstime from tailscale.com/derp+ 💣 tailscale.com/tstime/mono from tailscale.com/tstime/rate tailscale.com/tstime/rate from tailscale.com/wgengine/filter+ tailscale.com/tsweb from tailscale.com/cmd/derper diff --git a/cmd/tailscale/depaware.txt b/cmd/tailscale/depaware.txt index 9384ba84b..d22a34bde 100644 --- a/cmd/tailscale/depaware.txt +++ b/cmd/tailscale/depaware.txt @@ -108,6 +108,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep tailscale.com/tailcfg from tailscale.com/cmd/tailscale/cli+ tailscale.com/tka from tailscale.com/client/tailscale+ W tailscale.com/tsconst from tailscale.com/net/interfaces + tailscale.com/tstime from tailscale.com/derp+ 💣 tailscale.com/tstime/mono from tailscale.com/tstime/rate tailscale.com/tstime/rate from tailscale.com/wgengine/filter+ tailscale.com/types/dnstype from tailscale.com/tailcfg diff --git a/derp/derp_client.go b/derp/derp_client.go index 2889d81ab..b508dcd4e 100644 --- a/derp/derp_client.go +++ b/derp/derp_client.go @@ -17,6 +17,7 @@ "go4.org/mem" "golang.org/x/time/rate" "tailscale.com/syncs" + "tailscale.com/tstime" "tailscale.com/types/key" "tailscale.com/types/logger" ) @@ -40,6 +41,8 @@ type Client struct { // Owned by Recv: peeked int // bytes to discard on next Recv readErr syncs.AtomicValue[error] // sticky (set by Recv) + + clock tstime.Clock } // ClientOpt is an option passed to NewClient. @@ -103,6 +106,7 @@ func newClient(privateKey key.NodePrivate, nc Conn, brw *bufio.ReadWriter, logf meshKey: opt.MeshKey, canAckPings: opt.CanAckPings, isProber: opt.IsProber, + clock: tstime.StdClock{}, } if opt.ServerPub.IsZero() { if err := c.recvServerKey(); err != nil { @@ -214,7 +218,7 @@ func (c *Client) send(dstKey key.NodePublic, pkt []byte) (ret error) { defer c.wmu.Unlock() if c.rate != nil { pktLen := frameHeaderLen + key.NodePublicRawLen + len(pkt) - if !c.rate.AllowN(time.Now(), pktLen) { + if !c.rate.AllowN(c.clock.Now(), pktLen) { return nil // drop } } @@ -244,7 +248,7 @@ func (c *Client) ForwardPacket(srcKey, dstKey key.NodePublic, pkt []byte) (err e c.wmu.Lock() defer c.wmu.Unlock() - timer := time.AfterFunc(5*time.Second, c.writeTimeoutFired) + timer := c.clock.AfterFunc(5*time.Second, c.writeTimeoutFired) defer timer.Stop() if err := writeFrameHeader(c.bw, frameForwardPacket, uint32(keyLen*2+len(pkt))); err != nil { @@ -457,7 +461,6 @@ func (c *Client) recvTimeout(timeout time.Duration) (m ReceivedMessage, err erro c.readErr.Store(err) } }() - for { c.nc.SetReadDeadline(time.Now().Add(timeout)) diff --git a/derp/derp_server.go b/derp/derp_server.go index 1ad5d25f3..d4e7fd598 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -39,6 +39,7 @@ "tailscale.com/envknob" "tailscale.com/metrics" "tailscale.com/syncs" + "tailscale.com/tstime" "tailscale.com/tstime/rate" "tailscale.com/types/key" "tailscale.com/types/logger" @@ -164,6 +165,8 @@ type Server struct { // maps from netip.AddrPort to a client's public key keyOfAddr map[netip.AddrPort]key.NodePublic + + clock tstime.Clock } // clientSet represents 1 or more *sclients. @@ -318,6 +321,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { avgQueueDuration: new(uint64), tcpRtt: metrics.LabelMap{Label: "le"}, keyOfAddr: map[netip.AddrPort]key.NodePublic{}, + clock: tstime.StdClock{}, } s.initMetacert() s.packetsRecvDisco = s.packetsRecvByKind.Get("disco") @@ -467,8 +471,8 @@ func (s *Server) initMetacert() { CommonName: fmt.Sprintf("derpkey%s", s.publicKey.UntypedHexString()), }, // Windows requires NotAfter and NotBefore set: - NotAfter: time.Now().Add(30 * 24 * time.Hour), - NotBefore: time.Now().Add(-30 * 24 * time.Hour), + NotAfter: s.clock.Now().Add(30 * 24 * time.Hour), + NotBefore: s.clock.Now().Add(-30 * 24 * time.Hour), // Per https://github.com/golang/go/issues/51759#issuecomment-1071147836, // macOS requires BasicConstraints when subject == issuer: BasicConstraintsValid: true, @@ -697,7 +701,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem done: ctx.Done(), remoteAddr: remoteAddr, remoteIPPort: remoteIPPort, - connectedAt: time.Now(), + connectedAt: s.clock.Now(), sendQueue: make(chan pkt, perClientSendQueueDepth), discoSendQueue: make(chan pkt, perClientSendQueueDepth), sendPongCh: make(chan [8]byte, 1), @@ -927,7 +931,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { return c.sendPkt(dst, pkt{ bs: contents, - enqueuedAt: time.Now(), + enqueuedAt: c.s.clock.Now(), src: srcKey, }) } @@ -994,7 +998,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { p := pkt{ bs: contents, - enqueuedAt: time.Now(), + enqueuedAt: c.s.clock.Now(), src: c.key, } return c.sendPkt(dst, p) @@ -1387,7 +1391,7 @@ func (c *sclient) setPreferred(v bool) { // graphs, so not important to miss a move. But it shouldn't: // the netcheck/re-STUNs in magicsock only happen about every // 30 seconds. - if time.Since(c.connectedAt) > 5*time.Second { + if c.s.clock.Since(c.connectedAt) > 5*time.Second { homeMove.Add(1) } } @@ -1401,7 +1405,7 @@ func expMovingAverage(prev, newValue, alpha float64) float64 { // recordQueueTime updates the average queue duration metric after a packet has been sent. func (c *sclient) recordQueueTime(enqueuedAt time.Time) { - elapsed := float64(time.Since(enqueuedAt).Milliseconds()) + elapsed := float64(c.s.clock.Since(enqueuedAt).Milliseconds()) for { old := atomic.LoadUint64(c.s.avgQueueDuration) newAvg := expMovingAverage(math.Float64frombits(old), elapsed, 0.1) @@ -1431,7 +1435,7 @@ func (c *sclient) sendLoop(ctx context.Context) error { }() jitter := time.Duration(rand.Intn(5000)) * time.Millisecond - keepAliveTick := time.NewTicker(keepAlive + jitter) + keepAliveTick, keepAliveTickChannel := c.s.clock.NewTicker(keepAlive + jitter) defer keepAliveTick.Stop() var werr error // last write error @@ -1461,7 +1465,7 @@ func (c *sclient) sendLoop(ctx context.Context) error { case msg := <-c.sendPongCh: werr = c.sendPong(msg) continue - case <-keepAliveTick.C: + case <-keepAliveTickChannel: werr = c.sendKeepAlive() continue default: @@ -1490,7 +1494,7 @@ func (c *sclient) sendLoop(ctx context.Context) error { case msg := <-c.sendPongCh: werr = c.sendPong(msg) continue - case <-keepAliveTick.C: + case <-keepAliveTickChannel: werr = c.sendKeepAlive() } } diff --git a/derp/derp_server_linux.go b/derp/derp_server_linux.go index 03bb204ca..48da8ed30 100644 --- a/derp/derp_server_linux.go +++ b/derp/derp_server_linux.go @@ -27,13 +27,13 @@ func (c *sclient) statsLoop(ctx context.Context) error { const statsInterval = 10 * time.Second - ticker := time.NewTicker(statsInterval) + ticker, tickerChannel := c.s.clock.NewTicker(statsInterval) defer ticker.Stop() statsLoop: for { select { - case <-ticker.C: + case <-tickerChannel: rtt, err := tcpinfo.RTT(conn) if err != nil { continue statsLoop diff --git a/derp/derp_test.go b/derp/derp_test.go index 62d4e0a00..7d8ea5f80 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -27,6 +27,7 @@ "golang.org/x/time/rate" "tailscale.com/disco" "tailscale.com/net/memnet" + "tailscale.com/tstest" "tailscale.com/types/key" "tailscale.com/types/logger" ) @@ -990,9 +991,10 @@ func TestClientRecv(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &Client{ - nc: dummyNetConn{}, - br: bufio.NewReader(bytes.NewReader(tt.input)), - logf: t.Logf, + nc: dummyNetConn{}, + br: bufio.NewReader(bytes.NewReader(tt.input)), + logf: t.Logf, + clock: &tstest.Clock{}, } got, err := c.Recv() if err != nil { @@ -1435,7 +1437,8 @@ func (w *countWriter) ResetStats() { func TestClientSendRateLimiting(t *testing.T) { cw := new(countWriter) c := &Client{ - bw: bufio.NewWriter(cw), + bw: bufio.NewWriter(cw), + clock: &tstest.Clock{}, } c.setSendRateLimiter(ServerInfoMessage{}) diff --git a/derp/derphttp/derphttp_client.go b/derp/derphttp/derphttp_client.go index 07317fcbf..bb3d44377 100644 --- a/derp/derphttp/derphttp_client.go +++ b/derp/derphttp/derphttp_client.go @@ -38,6 +38,7 @@ "tailscale.com/net/tshttpproxy" "tailscale.com/syncs" "tailscale.com/tailcfg" + "tailscale.com/tstime" "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/util/cmpx" @@ -83,6 +84,7 @@ type Client struct { serverPubKey key.NodePublic tlsState *tls.ConnectionState pingOut map[derp.PingMessage]chan<- bool // chan to send to on pong + clock tstime.Clock } func (c *Client) String() string { @@ -101,6 +103,7 @@ func NewRegionClient(privateKey key.NodePrivate, logf logger.Logf, netMon *netmo getRegion: getRegion, ctx: ctx, cancelCtx: cancel, + clock: tstime.StdClock{}, } return c } @@ -108,7 +111,7 @@ func NewRegionClient(privateKey key.NodePrivate, logf logger.Logf, netMon *netmo // NewNetcheckClient returns a Client that's only able to have its DialRegionTLS method called. // It's used by the netcheck package. func NewNetcheckClient(logf logger.Logf) *Client { - return &Client{logf: logf} + return &Client{logf: logf, clock: tstime.StdClock{}} } // NewClient returns a new DERP-over-HTTP client. It connects lazily. @@ -129,6 +132,7 @@ func NewClient(privateKey key.NodePrivate, serverURL string, logf logger.Logf) ( url: u, ctx: ctx, cancelCtx: cancel, + clock: tstime.StdClock{}, } return c, nil } @@ -644,14 +648,14 @@ type res struct { nwait++ go func() { if proto == "tcp4" && c.preferIPv6() { - t := time.NewTimer(200 * time.Millisecond) + t, tChannel := c.clock.NewTimer(200 * time.Millisecond) select { case <-ctx.Done(): // Either user canceled original context, // it timed out, or the v6 dial succeeded. t.Stop() return - case <-t.C: + case <-tChannel: // Start v4 dial } } diff --git a/derp/derphttp/mesh_client.go b/derp/derphttp/mesh_client.go index 4454136ab..a978c034e 100644 --- a/derp/derphttp/mesh_client.go +++ b/derp/derphttp/mesh_client.go @@ -51,7 +51,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key present = map[key.NodePublic]bool{} } lastConnGen := 0 - lastStatus := time.Now() + lastStatus := c.clock.Now() logConnectedLocked := func() { if loggedConnected { return @@ -61,7 +61,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key } const logConnectedDelay = 200 * time.Millisecond - timer := time.AfterFunc(2*time.Second, func() { + timer := c.clock.AfterFunc(2*time.Second, func() { mu.Lock() defer mu.Unlock() logConnectedLocked() @@ -91,11 +91,11 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key } sleep := func(d time.Duration) { - t := time.NewTimer(d) + t, tChannel := c.clock.NewTimer(d) select { case <-ctx.Done(): t.Stop() - case <-t.C: + case <-tChannel: } } @@ -142,7 +142,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key default: continue } - if now := time.Now(); now.Sub(lastStatus) > statusInterval { + if now := c.clock.Now(); now.Sub(lastStatus) > statusInterval { lastStatus = now infoLogf("%d peers", len(present)) }