diff --git a/derp/derp_server.go b/derp/derp_server.go index 96d5ae3fc..861a41dc8 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -478,20 +478,21 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN remoteIPPort, _ := netaddr.ParseIPPort(remoteAddr) c := &sclient{ - connNum: connNum, - s: s, - key: clientKey, - nc: nc, - br: br, - bw: bw, - logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)), - done: ctx.Done(), - remoteAddr: remoteAddr, - remoteIPPort: remoteIPPort, - connectedAt: time.Now(), - sendQueue: make(chan pkt, perClientSendQueueDepth), - peerGone: make(chan key.Public), - canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, + connNum: connNum, + s: s, + key: clientKey, + nc: nc, + br: br, + bw: bw, + logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)), + done: ctx.Done(), + remoteAddr: remoteAddr, + remoteIPPort: remoteIPPort, + connectedAt: time.Now(), + sendQueue: make(chan pkt, perClientSendQueueDepth), + discoSendQueue: make(chan pkt, perClientSendQueueDepth), + peerGone: make(chan key.Public), + canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, } if c.canMesh { c.meshUpdate = make(chan struct{}) @@ -739,6 +740,10 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { // Attempt to queue for sending up to 3 times. On each attempt, if // the queue is full, try to drop from queue head to prioritize // fresher packets. + sendQueue := dst.sendQueue + if disco.LooksLikeDiscoWrapper(p.bs) { + sendQueue = dst.discoSendQueue + } for attempt := 0; attempt < 3; attempt++ { select { case <-dst.done: @@ -747,13 +752,13 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { default: } select { - case dst.sendQueue <- p: + case sendQueue <- p: return nil default: } select { - case pkt := <-dst.sendQueue: + case pkt := <-sendQueue: s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead) c.recordQueueTime(pkt.enqueuedAt) default: @@ -932,19 +937,20 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d // (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go) type sclient struct { // Static after construction. - connNum int64 // process-wide unique counter, incremented each Accept - s *Server - nc Conn - key key.Public - info clientInfo - logf logger.Logf - done <-chan struct{} // closed when connection closes - remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() - remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port. - sendQueue chan pkt // packets queued to this client; never closed - peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) - meshUpdate chan struct{} // write request to write peerStateChange - canMesh bool // clientInfo had correct mesh token for inter-region routing + connNum int64 // process-wide unique counter, incremented each Accept + s *Server + nc Conn + key key.Public + info clientInfo + logf logger.Logf + done <-chan struct{} // closed when connection closes + remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() + remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port. + sendQueue chan pkt // packets queued to this client; never closed + discoSendQueue chan pkt // important packets queued to this client; never closed + peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) + meshUpdate chan struct{} // write request to write peerStateChange + canMesh bool // clientInfo had correct mesh token for inter-region routing // Owned by run, not thread-safe. br *bufio.Reader @@ -1039,6 +1045,8 @@ func (c *sclient) sendLoop(ctx context.Context) error { select { case pkt := <-c.sendQueue: c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGone) + case pkt := <-c.discoSendQueue: + c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGone) default: return } @@ -1069,6 +1077,10 @@ func (c *sclient) sendLoop(ctx context.Context) error { werr = c.sendPacket(msg.src, msg.bs) c.recordQueueTime(msg.enqueuedAt) continue + case msg := <-c.discoSendQueue: + werr = c.sendPacket(msg.src, msg.bs) + c.recordQueueTime(msg.enqueuedAt) + continue case <-keepAliveTick.C: werr = c.sendKeepAlive() continue @@ -1092,6 +1104,9 @@ func (c *sclient) sendLoop(ctx context.Context) error { case msg := <-c.sendQueue: werr = c.sendPacket(msg.src, msg.bs) c.recordQueueTime(msg.enqueuedAt) + case msg := <-c.discoSendQueue: + werr = c.sendPacket(msg.src, msg.bs) + c.recordQueueTime(msg.enqueuedAt) case <-keepAliveTick.C: werr = c.sendKeepAlive() }