diff --git a/cmd/derper/mesh.go b/cmd/derper/mesh.go index 4e5392732..ee1807f00 100644 --- a/cmd/derper/mesh.go +++ b/cmd/derper/mesh.go @@ -9,14 +9,12 @@ "fmt" "log" "net" - "net/netip" "strings" "time" "tailscale.com/derp" "tailscale.com/derp/derphttp" "tailscale.com/net/netmon" - "tailscale.com/types/key" "tailscale.com/types/logger" ) @@ -71,8 +69,8 @@ func startMeshWithHost(s *derp.Server, host string) error { return d.DialContext(ctx, network, addr) }) - add := func(k key.NodePublic, _ netip.AddrPort) { s.AddPacketForwarder(k, c) } - remove := func(k key.NodePublic) { s.RemovePacketForwarder(k, c) } + add := func(m derp.PeerPresentMessage) { s.AddPacketForwarder(m.Key, c) } + remove := func(m derp.PeerGoneMessage) { s.RemovePacketForwarder(m.Peer, c) } go c.RunWatchConnectionLoop(context.Background(), s.PublicKey(), logf, add, remove) return nil } diff --git a/derp/derp.go b/derp/derp.go index 26a35a718..f9b070647 100644 --- a/derp/derp.go +++ b/derp/derp.go @@ -131,8 +131,9 @@ type PeerGoneReasonType byte const ( - PeerGoneReasonDisconnected = PeerGoneReasonType(0x00) // peer disconnected from this server - PeerGoneReasonNotHere = PeerGoneReasonType(0x01) // server doesn't know about this peer, unexpected + PeerGoneReasonDisconnected = PeerGoneReasonType(0x00) // peer disconnected from this server + PeerGoneReasonNotHere = PeerGoneReasonType(0x01) // server doesn't know about this peer, unexpected + PeerGoneReasonMeshConnBroke = PeerGoneReasonType(0xf0) // invented by Client.RunWatchConnectionLoop on disconnect; not sent on the wire ) // PeerPresentFlags is an optional byte of bit flags sent after a framePeerPresent message. diff --git a/derp/derphttp/derphttp_test.go b/derp/derphttp/derphttp_test.go index 5cfd745a8..cfb3676cd 100644 --- a/derp/derphttp/derphttp_test.go +++ b/derp/derphttp/derphttp_test.go @@ -11,7 +11,6 @@ "net" "net/http" "net/http/httptest" - "net/netip" "sync" "testing" "time" @@ -299,13 +298,13 @@ func TestBreakWatcherConnRecv(t *testing.T) { go func() { defer wg.Done() var peers int - add := func(k key.NodePublic, _ netip.AddrPort) { - t.Logf("add: %v", k.ShortString()) + add := func(m derp.PeerPresentMessage) { + t.Logf("add: %v", m.Key.ShortString()) peers++ // Signal that the watcher has run watcherChan <- peers } - remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- } + remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove) }() @@ -370,15 +369,15 @@ func TestBreakWatcherConn(t *testing.T) { go func() { defer wg.Done() var peers int - add := func(k key.NodePublic, _ netip.AddrPort) { - t.Logf("add: %v", k.ShortString()) + add := func(m derp.PeerPresentMessage) { + t.Logf("add: %v", m.Key.ShortString()) peers++ // Signal that the watcher has run watcherChan <- peers // Wait for breaker to run <-breakerChan } - remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- } + remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove) }() @@ -407,8 +406,8 @@ func TestBreakWatcherConn(t *testing.T) { } } -func noopAdd(key.NodePublic, netip.AddrPort) {} -func noopRemove(key.NodePublic) {} +func noopAdd(derp.PeerPresentMessage) {} +func noopRemove(derp.PeerGoneMessage) {} func TestRunWatchConnectionLoopServeConnect(t *testing.T) { defer func() { testHookWatchLookConnectResult = nil }() diff --git a/derp/derphttp/mesh_client.go b/derp/derphttp/mesh_client.go index 07aaa3c89..c7e48c733 100644 --- a/derp/derphttp/mesh_client.go +++ b/derp/derphttp/mesh_client.go @@ -5,7 +5,6 @@ import ( "context" - "net/netip" "sync" "time" @@ -35,9 +34,14 @@ // To force RunWatchConnectionLoop to return quickly, its ctx needs to be // closed, and c itself needs to be closed. // -// It is a fatal error to call this on an already-started Client withoutq having +// It is a fatal error to call this on an already-started Client without having // initialized Client.WatchConnectionChanges to true. -func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add func(key.NodePublic, netip.AddrPort), remove func(key.NodePublic)) { +// +// If the DERP connection breaks and reconnects, remove will be called for all +// previously seen peers, with Reason type PeerGoneReasonSynthetic. Those +// clients are likely still connected and their add message will appear after +// reconnect. +func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf, add func(derp.PeerPresentMessage), remove func(derp.PeerGoneMessage)) { if !c.WatchConnectionChanges { if c.isStarted() { panic("invalid use of RunWatchConnectionLoop on already-started Client without setting Client.RunWatchConnectionLoop") @@ -62,7 +66,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key } logf("reconnected; clearing %d forwarding mappings", len(present)) for k := range present { - remove(k) + remove(derp.PeerGoneMessage{Peer: k, Reason: derp.PeerGoneReasonMeshConnBroke}) } present = map[key.NodePublic]bool{} } @@ -84,13 +88,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key }) defer timer.Stop() - updatePeer := func(k key.NodePublic, ipPort netip.AddrPort, isPresent bool) { - if isPresent { - add(k, ipPort) - } else { - remove(k) - } - + updatePeer := func(k key.NodePublic, isPresent bool) { mu.Lock() defer mu.Unlock() if isPresent { @@ -148,7 +146,8 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key } switch m := m.(type) { case derp.PeerPresentMessage: - updatePeer(m.Key, m.IPPort, true) + add(m) + updatePeer(m.Key, true) case derp.PeerGoneMessage: switch m.Reason { case derp.PeerGoneReasonDisconnected: @@ -160,7 +159,8 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key logf("Recv: peer %s not at server %s for unknown reason %v", key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString(), m.Reason) } - updatePeer(key.NodePublic(m.Peer), netip.AddrPort{}, false) + remove(m) + updatePeer(m.Peer, false) default: continue }