derp: use a dedicated queue for disco traffic.

Signed-off-by: David Anderson <danderson@tailscale.com>
This commit is contained in:
David Anderson 2021-07-12 14:01:51 -07:00 committed by Dave Anderson
parent 67158549ab
commit d98829583a

View File

@ -478,20 +478,21 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
remoteIPPort, _ := netaddr.ParseIPPort(remoteAddr) remoteIPPort, _ := netaddr.ParseIPPort(remoteAddr)
c := &sclient{ c := &sclient{
connNum: connNum, connNum: connNum,
s: s, s: s,
key: clientKey, key: clientKey,
nc: nc, nc: nc,
br: br, br: br,
bw: bw, bw: bw,
logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)), logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)),
done: ctx.Done(), done: ctx.Done(),
remoteAddr: remoteAddr, remoteAddr: remoteAddr,
remoteIPPort: remoteIPPort, remoteIPPort: remoteIPPort,
connectedAt: time.Now(), connectedAt: time.Now(),
sendQueue: make(chan pkt, perClientSendQueueDepth), sendQueue: make(chan pkt, perClientSendQueueDepth),
peerGone: make(chan key.Public), discoSendQueue: make(chan pkt, perClientSendQueueDepth),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, peerGone: make(chan key.Public),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
} }
if c.canMesh { if c.canMesh {
c.meshUpdate = make(chan struct{}) c.meshUpdate = make(chan struct{})
@ -739,6 +740,10 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
// Attempt to queue for sending up to 3 times. On each attempt, if // Attempt to queue for sending up to 3 times. On each attempt, if
// the queue is full, try to drop from queue head to prioritize // the queue is full, try to drop from queue head to prioritize
// fresher packets. // fresher packets.
sendQueue := dst.sendQueue
if disco.LooksLikeDiscoWrapper(p.bs) {
sendQueue = dst.discoSendQueue
}
for attempt := 0; attempt < 3; attempt++ { for attempt := 0; attempt < 3; attempt++ {
select { select {
case <-dst.done: case <-dst.done:
@ -747,13 +752,13 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
default: default:
} }
select { select {
case dst.sendQueue <- p: case sendQueue <- p:
return nil return nil
default: default:
} }
select { select {
case pkt := <-dst.sendQueue: case pkt := <-sendQueue:
s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead) s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead)
c.recordQueueTime(pkt.enqueuedAt) c.recordQueueTime(pkt.enqueuedAt)
default: default:
@ -932,19 +937,20 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go) // (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
type sclient struct { type sclient struct {
// Static after construction. // Static after construction.
connNum int64 // process-wide unique counter, incremented each Accept connNum int64 // process-wide unique counter, incremented each Accept
s *Server s *Server
nc Conn nc Conn
key key.Public key key.Public
info clientInfo info clientInfo
logf logger.Logf logf logger.Logf
done <-chan struct{} // closed when connection closes done <-chan struct{} // closed when connection closes
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port. remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port.
sendQueue chan pkt // packets queued to this client; never closed sendQueue chan pkt // packets queued to this client; never closed
peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) discoSendQueue chan pkt // important packets queued to this client; never closed
meshUpdate chan struct{} // write request to write peerStateChange peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers)
canMesh bool // clientInfo had correct mesh token for inter-region routing meshUpdate chan struct{} // write request to write peerStateChange
canMesh bool // clientInfo had correct mesh token for inter-region routing
// Owned by run, not thread-safe. // Owned by run, not thread-safe.
br *bufio.Reader br *bufio.Reader
@ -1039,6 +1045,8 @@ func (c *sclient) sendLoop(ctx context.Context) error {
select { select {
case pkt := <-c.sendQueue: case pkt := <-c.sendQueue:
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGone) c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGone)
case pkt := <-c.discoSendQueue:
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGone)
default: default:
return return
} }
@ -1069,6 +1077,10 @@ func (c *sclient) sendLoop(ctx context.Context) error {
werr = c.sendPacket(msg.src, msg.bs) werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt) c.recordQueueTime(msg.enqueuedAt)
continue continue
case msg := <-c.discoSendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case <-keepAliveTick.C: case <-keepAliveTick.C:
werr = c.sendKeepAlive() werr = c.sendKeepAlive()
continue continue
@ -1092,6 +1104,9 @@ func (c *sclient) sendLoop(ctx context.Context) error {
case msg := <-c.sendQueue: case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs) werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt) c.recordQueueTime(msg.enqueuedAt)
case msg := <-c.discoSendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case <-keepAliveTick.C: case <-keepAliveTick.C:
werr = c.sendKeepAlive() werr = c.sendKeepAlive()
} }