mirror of
https://github.com/tailscale/tailscale.git
synced 2025-07-29 15:23:45 +00:00
ipn,net,tsnet,wgengine: make an eventbus mandatory where it is used
In the components where an event bus is already plumbed through, remove the exceptions that allow it to be omitted, and update all the tests that relied on those workarounds execute properly. This change applies only to the places where we're already using the bus; it does not enforce the existence of a bus in other components (yet), Updates #15160 Change-Id: Iebb92243caba82b5eb420c49fc3e089a77454f65 Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
parent
c87f44b687
commit
9ad2928f91
@ -377,7 +377,7 @@ func peersMap(s []tailcfg.NodeView) map[tailcfg.NodeID]tailcfg.NodeView {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestAllowExitNodeDNSProxyToServeName(t *testing.T) {
|
func TestAllowExitNodeDNSProxyToServeName(t *testing.T) {
|
||||||
b := &LocalBackend{}
|
b := newTestLocalBackend(t)
|
||||||
if b.allowExitNodeDNSProxyToServeName("google.com") {
|
if b.allowExitNodeDNSProxyToServeName("google.com") {
|
||||||
t.Fatal("unexpected true on backend with nil NetMap")
|
t.Fatal("unexpected true on backend with nil NetMap")
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,6 @@ import (
|
|||||||
"tailscale.com/util/clientmetric"
|
"tailscale.com/util/clientmetric"
|
||||||
"tailscale.com/util/deephash"
|
"tailscale.com/util/deephash"
|
||||||
"tailscale.com/util/dnsname"
|
"tailscale.com/util/dnsname"
|
||||||
"tailscale.com/util/eventbus"
|
|
||||||
"tailscale.com/util/goroutines"
|
"tailscale.com/util/goroutines"
|
||||||
"tailscale.com/util/httpm"
|
"tailscale.com/util/httpm"
|
||||||
"tailscale.com/util/mak"
|
"tailscale.com/util/mak"
|
||||||
@ -618,15 +617,7 @@ func (b *LocalBackend) currentNode() *nodeBackend {
|
|||||||
if v := b.currentNodeAtomic.Load(); v != nil || !testenv.InTest() {
|
if v := b.currentNodeAtomic.Load(); v != nil || !testenv.InTest() {
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
// Auto-init [nodeBackend] in tests for LocalBackend created without the
|
v := newNodeBackend(cmp.Or(b.ctx, context.Background()), b.sys.Bus.Get())
|
||||||
// NewLocalBackend() constructor. Same reasoning for checking b.sys.
|
|
||||||
var bus *eventbus.Bus
|
|
||||||
if b.sys == nil {
|
|
||||||
bus = eventbus.New()
|
|
||||||
} else {
|
|
||||||
bus = b.sys.Bus.Get()
|
|
||||||
}
|
|
||||||
v := newNodeBackend(cmp.Or(b.ctx, context.Background()), bus)
|
|
||||||
if b.currentNodeAtomic.CompareAndSwap(nil, v) {
|
if b.currentNodeAtomic.CompareAndSwap(nil, v) {
|
||||||
v.ready()
|
v.ready()
|
||||||
}
|
}
|
||||||
|
@ -463,6 +463,7 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend {
|
|||||||
var logf logger.Logf = logger.Discard
|
var logf logger.Logf = logger.Discard
|
||||||
if _, ok := sys.StateStore.GetOK(); !ok {
|
if _, ok := sys.StateStore.GetOK(); !ok {
|
||||||
sys.Set(new(mem.Store))
|
sys.Set(new(mem.Store))
|
||||||
|
t.Log("Added memory store for testing")
|
||||||
}
|
}
|
||||||
if _, ok := sys.Engine.GetOK(); !ok {
|
if _, ok := sys.Engine.GetOK(); !ok {
|
||||||
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
|
eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry(), sys.Bus.Get())
|
||||||
@ -471,6 +472,11 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend {
|
|||||||
}
|
}
|
||||||
t.Cleanup(eng.Close)
|
t.Cleanup(eng.Close)
|
||||||
sys.Set(eng)
|
sys.Set(eng)
|
||||||
|
t.Log("Added fake userspace engine for testing")
|
||||||
|
}
|
||||||
|
if _, ok := sys.Dialer.GetOK(); !ok {
|
||||||
|
sys.Set(tsdial.NewDialer(netmon.NewStatic()))
|
||||||
|
t.Log("Added static dialer for testing")
|
||||||
}
|
}
|
||||||
lb, err := NewLocalBackend(logf, logid.PublicID{}, sys, 0)
|
lb, err := NewLocalBackend(logf, logid.PublicID{}, sys, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"tailscale.com/net/tsdial"
|
"tailscale.com/net/tsdial"
|
||||||
"tailscale.com/tailcfg"
|
"tailscale.com/tailcfg"
|
||||||
"tailscale.com/tka"
|
"tailscale.com/tka"
|
||||||
|
"tailscale.com/tsd"
|
||||||
"tailscale.com/types/key"
|
"tailscale.com/types/key"
|
||||||
"tailscale.com/types/netmap"
|
"tailscale.com/types/netmap"
|
||||||
"tailscale.com/types/persist"
|
"tailscale.com/types/persist"
|
||||||
@ -935,18 +936,21 @@ func TestTKAForceDisable(t *testing.T) {
|
|||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
cc := fakeControlClient(t, client)
|
cc := fakeControlClient(t, client)
|
||||||
b := LocalBackend{
|
sys := tsd.NewSystem()
|
||||||
varRoot: temp,
|
sys.Set(pm.Store())
|
||||||
cc: cc,
|
|
||||||
ccAuto: cc,
|
b := newTestLocalBackendWithSys(t, sys)
|
||||||
logf: t.Logf,
|
b.SetVarRoot(temp)
|
||||||
tka: &tkaState{
|
b.SetControlClientGetterForTesting(func(controlclient.Options) (controlclient.Client, error) {
|
||||||
|
return cc, nil
|
||||||
|
})
|
||||||
|
b.mu.Lock()
|
||||||
|
b.tka = &tkaState{
|
||||||
authority: authority,
|
authority: authority,
|
||||||
storage: chonk,
|
storage: chonk,
|
||||||
},
|
|
||||||
pm: pm,
|
|
||||||
store: pm.Store(),
|
|
||||||
}
|
}
|
||||||
|
b.pm = pm
|
||||||
|
b.mu.Unlock()
|
||||||
|
|
||||||
if err := b.NetworkLockForceLocalDisable(); err != nil {
|
if err := b.NetworkLockForceLocalDisable(); err != nil {
|
||||||
t.Fatalf("NetworkLockForceLocalDisable() failed: %v", err)
|
t.Fatalf("NetworkLockForceLocalDisable() failed: %v", err)
|
||||||
|
@ -21,10 +21,10 @@ import (
|
|||||||
"tailscale.com/ipn"
|
"tailscale.com/ipn"
|
||||||
"tailscale.com/ipn/store/mem"
|
"tailscale.com/ipn/store/mem"
|
||||||
"tailscale.com/tailcfg"
|
"tailscale.com/tailcfg"
|
||||||
|
"tailscale.com/tsd"
|
||||||
"tailscale.com/tstest"
|
"tailscale.com/tstest"
|
||||||
"tailscale.com/types/logger"
|
"tailscale.com/types/logger"
|
||||||
"tailscale.com/types/netmap"
|
"tailscale.com/types/netmap"
|
||||||
"tailscale.com/util/eventbus"
|
|
||||||
"tailscale.com/util/must"
|
"tailscale.com/util/must"
|
||||||
"tailscale.com/util/usermetric"
|
"tailscale.com/util/usermetric"
|
||||||
"tailscale.com/wgengine"
|
"tailscale.com/wgengine"
|
||||||
@ -156,10 +156,9 @@ func TestHandlePeerAPI(t *testing.T) {
|
|||||||
selfNode.CapMap = tailcfg.NodeCapMap{tailcfg.CapabilityDebug: nil}
|
selfNode.CapMap = tailcfg.NodeCapMap{tailcfg.CapabilityDebug: nil}
|
||||||
}
|
}
|
||||||
var e peerAPITestEnv
|
var e peerAPITestEnv
|
||||||
lb := &LocalBackend{
|
lb := newTestLocalBackend(t)
|
||||||
logf: e.logBuf.Logf,
|
lb.logf = e.logBuf.Logf
|
||||||
clock: &tstest.Clock{},
|
lb.clock = &tstest.Clock{}
|
||||||
}
|
|
||||||
lb.currentNode().SetNetMap(&netmap.NetworkMap{SelfNode: selfNode.View()})
|
lb.currentNode().SetNetMap(&netmap.NetworkMap{SelfNode: selfNode.View()})
|
||||||
e.ph = &peerAPIHandler{
|
e.ph = &peerAPIHandler{
|
||||||
isSelf: tt.isSelf,
|
isSelf: tt.isSelf,
|
||||||
@ -195,20 +194,20 @@ func TestPeerAPIReplyToDNSQueries(t *testing.T) {
|
|||||||
h.isSelf = false
|
h.isSelf = false
|
||||||
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
||||||
|
|
||||||
bus := eventbus.New()
|
sys := tsd.NewSystem()
|
||||||
defer bus.Close()
|
t.Cleanup(sys.Bus.Get().Close)
|
||||||
|
|
||||||
ht := new(health.Tracker)
|
ht := new(health.Tracker)
|
||||||
reg := new(usermetric.Registry)
|
|
||||||
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
|
|
||||||
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
||||||
h.ps = &peerAPIServer{
|
reg := new(usermetric.Registry)
|
||||||
b: &LocalBackend{
|
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set)
|
||||||
e: eng,
|
sys.Set(pm.Store())
|
||||||
pm: pm,
|
sys.Set(eng)
|
||||||
store: pm.Store(),
|
|
||||||
},
|
b := newTestLocalBackendWithSys(t, sys)
|
||||||
}
|
b.pm = pm
|
||||||
|
|
||||||
|
h.ps = &peerAPIServer{b: b}
|
||||||
if h.ps.b.OfferingExitNode() {
|
if h.ps.b.OfferingExitNode() {
|
||||||
t.Fatal("unexpectedly offering exit node")
|
t.Fatal("unexpectedly offering exit node")
|
||||||
}
|
}
|
||||||
@ -250,12 +249,12 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) {
|
|||||||
var h peerAPIHandler
|
var h peerAPIHandler
|
||||||
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
||||||
|
|
||||||
bus := eventbus.New()
|
sys := tsd.NewSystem()
|
||||||
defer bus.Close()
|
t.Cleanup(sys.Bus.Get().Close)
|
||||||
|
|
||||||
ht := new(health.Tracker)
|
ht := new(health.Tracker)
|
||||||
reg := new(usermetric.Registry)
|
reg := new(usermetric.Registry)
|
||||||
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
|
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set)
|
||||||
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
||||||
var a *appc.AppConnector
|
var a *appc.AppConnector
|
||||||
if shouldStore {
|
if shouldStore {
|
||||||
@ -263,16 +262,14 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
a = appc.NewAppConnector(t.Logf, &appctest.RouteCollector{}, nil, nil)
|
a = appc.NewAppConnector(t.Logf, &appctest.RouteCollector{}, nil, nil)
|
||||||
}
|
}
|
||||||
h.ps = &peerAPIServer{
|
sys.Set(pm.Store())
|
||||||
b: &LocalBackend{
|
sys.Set(eng)
|
||||||
e: eng,
|
|
||||||
pm: pm,
|
|
||||||
store: pm.Store(),
|
|
||||||
// configure as an app connector just to enable the API.
|
|
||||||
appConnector: a,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
b := newTestLocalBackendWithSys(t, sys)
|
||||||
|
b.pm = pm
|
||||||
|
b.appConnector = a // configure as an app connector just to enable the API.
|
||||||
|
|
||||||
|
h.ps = &peerAPIServer{b: b}
|
||||||
h.ps.resolver = &fakeResolver{build: func(b *dnsmessage.Builder) {
|
h.ps.resolver = &fakeResolver{build: func(b *dnsmessage.Builder) {
|
||||||
b.CNAMEResource(
|
b.CNAMEResource(
|
||||||
dnsmessage.ResourceHeader{
|
dnsmessage.ResourceHeader{
|
||||||
@ -326,27 +323,29 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) {
|
|||||||
var h peerAPIHandler
|
var h peerAPIHandler
|
||||||
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
||||||
|
|
||||||
bus := eventbus.New()
|
sys := tsd.NewSystem()
|
||||||
defer bus.Close()
|
t.Cleanup(sys.Bus.Get().Close)
|
||||||
|
|
||||||
rc := &appctest.RouteCollector{}
|
rc := &appctest.RouteCollector{}
|
||||||
ht := new(health.Tracker)
|
ht := new(health.Tracker)
|
||||||
reg := new(usermetric.Registry)
|
|
||||||
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
|
|
||||||
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
||||||
|
|
||||||
|
reg := new(usermetric.Registry)
|
||||||
|
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set)
|
||||||
var a *appc.AppConnector
|
var a *appc.AppConnector
|
||||||
if shouldStore {
|
if shouldStore {
|
||||||
a = appc.NewAppConnector(t.Logf, rc, &appc.RouteInfo{}, fakeStoreRoutes)
|
a = appc.NewAppConnector(t.Logf, rc, &appc.RouteInfo{}, fakeStoreRoutes)
|
||||||
} else {
|
} else {
|
||||||
a = appc.NewAppConnector(t.Logf, rc, nil, nil)
|
a = appc.NewAppConnector(t.Logf, rc, nil, nil)
|
||||||
}
|
}
|
||||||
h.ps = &peerAPIServer{
|
sys.Set(pm.Store())
|
||||||
b: &LocalBackend{
|
sys.Set(eng)
|
||||||
e: eng,
|
|
||||||
pm: pm,
|
b := newTestLocalBackendWithSys(t, sys)
|
||||||
store: pm.Store(),
|
b.pm = pm
|
||||||
appConnector: a,
|
b.appConnector = a
|
||||||
},
|
|
||||||
}
|
h.ps = &peerAPIServer{b: b}
|
||||||
h.ps.b.appConnector.UpdateDomains([]string{"example.com"})
|
h.ps.b.appConnector.UpdateDomains([]string{"example.com"})
|
||||||
h.ps.b.appConnector.Wait(ctx)
|
h.ps.b.appConnector.Wait(ctx)
|
||||||
|
|
||||||
@ -393,12 +392,13 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) {
|
|||||||
var h peerAPIHandler
|
var h peerAPIHandler
|
||||||
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345")
|
||||||
|
|
||||||
bus := eventbus.New()
|
sys := tsd.NewSystem()
|
||||||
defer bus.Close()
|
t.Cleanup(sys.Bus.Get().Close)
|
||||||
|
|
||||||
ht := new(health.Tracker)
|
ht := new(health.Tracker)
|
||||||
reg := new(usermetric.Registry)
|
reg := new(usermetric.Registry)
|
||||||
rc := &appctest.RouteCollector{}
|
rc := &appctest.RouteCollector{}
|
||||||
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, bus)
|
eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg, sys.Bus.Get(), sys.Set)
|
||||||
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht))
|
||||||
var a *appc.AppConnector
|
var a *appc.AppConnector
|
||||||
if shouldStore {
|
if shouldStore {
|
||||||
@ -406,14 +406,14 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) {
|
|||||||
} else {
|
} else {
|
||||||
a = appc.NewAppConnector(t.Logf, rc, nil, nil)
|
a = appc.NewAppConnector(t.Logf, rc, nil, nil)
|
||||||
}
|
}
|
||||||
h.ps = &peerAPIServer{
|
sys.Set(pm.Store())
|
||||||
b: &LocalBackend{
|
sys.Set(eng)
|
||||||
e: eng,
|
|
||||||
pm: pm,
|
b := newTestLocalBackendWithSys(t, sys)
|
||||||
store: pm.Store(),
|
b.pm = pm
|
||||||
appConnector: a,
|
b.appConnector = a
|
||||||
},
|
|
||||||
}
|
h.ps = &peerAPIServer{b: b}
|
||||||
h.ps.b.appConnector.UpdateDomains([]string{"www.example.com"})
|
h.ps.b.appConnector.UpdateDomains([]string{"www.example.com"})
|
||||||
h.ps.b.appConnector.Wait(ctx)
|
h.ps.b.appConnector.Wait(ctx)
|
||||||
|
|
||||||
|
@ -263,16 +263,21 @@ func (d *TestIGD) handlePCPQuery(pkt []byte, src netip.AddrPort) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newTestClient configures a new test client connected to igd for mapping updates.
|
// newTestClient configures a new test client connected to igd for mapping updates.
|
||||||
// If bus != nil, update events are published to it.
|
// If bus == nil, a new empty event bus is constructed that is cleaned up when t exits.
|
||||||
// A cleanup for the resulting client is added to t.
|
// A cleanup for the resulting client is also added to t.
|
||||||
func newTestClient(t *testing.T, igd *TestIGD, bus *eventbus.Bus) *Client {
|
func newTestClient(t *testing.T, igd *TestIGD, bus *eventbus.Bus) *Client {
|
||||||
|
if bus == nil {
|
||||||
|
bus = eventbus.New()
|
||||||
|
t.Log("Created empty event bus for test client")
|
||||||
|
t.Cleanup(bus.Close)
|
||||||
|
}
|
||||||
var c *Client
|
var c *Client
|
||||||
c = NewClient(Config{
|
c = NewClient(Config{
|
||||||
Logf: tstest.WhileTestRunningLogger(t),
|
Logf: tstest.WhileTestRunningLogger(t),
|
||||||
NetMon: netmon.NewStatic(),
|
NetMon: netmon.NewStatic(),
|
||||||
ControlKnobs: new(controlknobs.Knobs),
|
ControlKnobs: new(controlknobs.Knobs),
|
||||||
EventBus: bus,
|
EventBus: bus,
|
||||||
OnChange: func() {
|
OnChange: func() { // TODO(creachadair): Remove.
|
||||||
t.Logf("port map changed")
|
t.Logf("port map changed")
|
||||||
t.Logf("have mapping: %v", c.HaveMapping())
|
t.Logf("have mapping: %v", c.HaveMapping())
|
||||||
},
|
},
|
||||||
|
@ -85,7 +85,7 @@ const trustServiceStillAvailableDuration = 10 * time.Minute
|
|||||||
|
|
||||||
// Client is a port mapping client.
|
// Client is a port mapping client.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
// The following two fields must either both be nil, or both non-nil.
|
// The following two fields must both be non-nil.
|
||||||
// Both are immutable after construction.
|
// Both are immutable after construction.
|
||||||
pubClient *eventbus.Client
|
pubClient *eventbus.Client
|
||||||
updates *eventbus.Publisher[Mapping]
|
updates *eventbus.Publisher[Mapping]
|
||||||
@ -238,8 +238,11 @@ type Config struct {
|
|||||||
// NewClient constructs a new portmapping [Client] from c. It will panic if any
|
// NewClient constructs a new portmapping [Client] from c. It will panic if any
|
||||||
// required parameters are omitted.
|
// required parameters are omitted.
|
||||||
func NewClient(c Config) *Client {
|
func NewClient(c Config) *Client {
|
||||||
if c.NetMon == nil {
|
switch {
|
||||||
panic("nil netMon")
|
case c.NetMon == nil:
|
||||||
|
panic("nil NetMon")
|
||||||
|
case c.EventBus == nil:
|
||||||
|
panic("nil EventBus")
|
||||||
}
|
}
|
||||||
ret := &Client{
|
ret := &Client{
|
||||||
logf: c.Logf,
|
logf: c.Logf,
|
||||||
@ -248,10 +251,8 @@ func NewClient(c Config) *Client {
|
|||||||
onChange: c.OnChange,
|
onChange: c.OnChange,
|
||||||
controlKnobs: c.ControlKnobs,
|
controlKnobs: c.ControlKnobs,
|
||||||
}
|
}
|
||||||
if c.EventBus != nil {
|
|
||||||
ret.pubClient = c.EventBus.Client("portmapper")
|
ret.pubClient = c.EventBus.Client("portmapper")
|
||||||
ret.updates = eventbus.Publish[Mapping](ret.pubClient)
|
ret.updates = eventbus.Publish[Mapping](ret.pubClient)
|
||||||
}
|
|
||||||
if ret.logf == nil {
|
if ret.logf == nil {
|
||||||
ret.logf = logger.Discard
|
ret.logf = logger.Discard
|
||||||
}
|
}
|
||||||
@ -286,10 +287,9 @@ func (c *Client) Close() error {
|
|||||||
}
|
}
|
||||||
c.closed = true
|
c.closed = true
|
||||||
c.invalidateMappingsLocked(true)
|
c.invalidateMappingsLocked(true)
|
||||||
if c.updates != nil {
|
|
||||||
c.updates.Close()
|
c.updates.Close()
|
||||||
c.pubClient.Close()
|
c.pubClient.Close()
|
||||||
}
|
|
||||||
// TODO: close some future ever-listening UDP socket(s),
|
// TODO: close some future ever-listening UDP socket(s),
|
||||||
// waiting for multicast announcements from router.
|
// waiting for multicast announcements from router.
|
||||||
return nil
|
return nil
|
||||||
@ -508,14 +508,14 @@ func (c *Client) createMapping() {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if c.updates != nil {
|
|
||||||
c.updates.Publish(Mapping{
|
c.updates.Publish(Mapping{
|
||||||
External: mapping.External(),
|
External: mapping.External(),
|
||||||
Type: mapping.MappingType(),
|
Type: mapping.MappingType(),
|
||||||
GoodUntil: mapping.GoodUntil(),
|
GoodUntil: mapping.GoodUntil(),
|
||||||
})
|
})
|
||||||
}
|
// TODO(creachadair): Remove this entirely once there are no longer any
|
||||||
if c.onChange != nil && c.pubClient == nil {
|
// places where the callback is set.
|
||||||
|
if c.onChange != nil {
|
||||||
go c.onChange()
|
go c.onChange()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -291,6 +291,9 @@ func NewServer(logf logger.Logf, port int, overrideAddrs []netip.Addr) (s *Serve
|
|||||||
s.vniPool = append(s.vniPool, uint32(i))
|
s.vniPool = append(s.vniPool, uint32(i))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(creachadair): Find a way to plumb this in during initialization.
|
||||||
|
// As-written, messages published here will not be seen by other components
|
||||||
|
// in a running client.
|
||||||
bus := eventbus.New()
|
bus := eventbus.New()
|
||||||
s.bus = bus
|
s.bus = bus
|
||||||
netMon, err := netmon.New(s.bus, logf)
|
netMon, err := netmon.New(s.bus, logf)
|
||||||
|
@ -435,10 +435,7 @@ func (s *Server) Close() error {
|
|||||||
ln.closeLocked()
|
ln.closeLocked()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
s.sys.Bus.Get().Close()
|
||||||
if bus := s.sys.Bus.Get(); bus != nil {
|
|
||||||
bus.Close()
|
|
||||||
}
|
|
||||||
s.closed = true
|
s.closed = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -715,8 +715,11 @@ func (c *Conn) Synchronize() {
|
|||||||
// As the set of possible endpoints for a Conn changes, the
|
// As the set of possible endpoints for a Conn changes, the
|
||||||
// callback opts.EndpointsFunc is called.
|
// callback opts.EndpointsFunc is called.
|
||||||
func NewConn(opts Options) (*Conn, error) {
|
func NewConn(opts Options) (*Conn, error) {
|
||||||
if opts.NetMon == nil {
|
switch {
|
||||||
|
case opts.NetMon == nil:
|
||||||
return nil, errors.New("magicsock.Options.NetMon must be non-nil")
|
return nil, errors.New("magicsock.Options.NetMon must be non-nil")
|
||||||
|
case opts.EventBus == nil:
|
||||||
|
return nil, errors.New("magicsock.Options.EventBus must be non-nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
c := newConn(opts.logf())
|
c := newConn(opts.logf())
|
||||||
@ -729,7 +732,6 @@ func NewConn(opts Options) (*Conn, error) {
|
|||||||
c.testOnlyPacketListener = opts.TestOnlyPacketListener
|
c.testOnlyPacketListener = opts.TestOnlyPacketListener
|
||||||
c.noteRecvActivity = opts.NoteRecvActivity
|
c.noteRecvActivity = opts.NoteRecvActivity
|
||||||
|
|
||||||
if c.eventBus != nil {
|
|
||||||
c.eventClient = c.eventBus.Client("magicsock.Conn")
|
c.eventClient = c.eventBus.Client("magicsock.Conn")
|
||||||
|
|
||||||
// Subscribe calls must return before NewConn otherwise published
|
// Subscribe calls must return before NewConn otherwise published
|
||||||
@ -744,7 +746,6 @@ func NewConn(opts Options) (*Conn, error) {
|
|||||||
c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient)
|
c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient)
|
||||||
c.subsDoneCh = make(chan struct{})
|
c.subsDoneCh = make(chan struct{})
|
||||||
go c.consumeEventbusTopics()
|
go c.consumeEventbusTopics()
|
||||||
}
|
|
||||||
|
|
||||||
// Don't log the same log messages possibly every few seconds in our
|
// Don't log the same log messages possibly every few seconds in our
|
||||||
// portmapper.
|
// portmapper.
|
||||||
@ -3327,10 +3328,8 @@ func (c *Conn) Close() error {
|
|||||||
// deadlock with c.Close().
|
// deadlock with c.Close().
|
||||||
// 2. Conn.consumeEventbusTopics event handlers may not guard against
|
// 2. Conn.consumeEventbusTopics event handlers may not guard against
|
||||||
// undesirable post/in-progress Conn.Close() behaviors.
|
// undesirable post/in-progress Conn.Close() behaviors.
|
||||||
if c.eventClient != nil {
|
|
||||||
c.eventClient.Close()
|
c.eventClient.Close()
|
||||||
<-c.subsDoneCh
|
<-c.subsDoneCh
|
||||||
}
|
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
@ -179,7 +179,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen
|
|||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
bus := eventbus.New()
|
bus := eventbus.New()
|
||||||
defer bus.Close()
|
t.Cleanup(bus.Close)
|
||||||
|
|
||||||
netMon, err := netmon.New(bus, logf)
|
netMon, err := netmon.New(bus, logf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -191,6 +191,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen
|
|||||||
epCh := make(chan []tailcfg.Endpoint, 100) // arbitrary
|
epCh := make(chan []tailcfg.Endpoint, 100) // arbitrary
|
||||||
conn, err := NewConn(Options{
|
conn, err := NewConn(Options{
|
||||||
NetMon: netMon,
|
NetMon: netMon,
|
||||||
|
EventBus: bus,
|
||||||
Metrics: ®,
|
Metrics: ®,
|
||||||
Logf: logf,
|
Logf: logf,
|
||||||
HealthTracker: ht,
|
HealthTracker: ht,
|
||||||
@ -406,7 +407,7 @@ func TestNewConn(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bus := eventbus.New()
|
bus := eventbus.New()
|
||||||
defer bus.Close()
|
t.Cleanup(bus.Close)
|
||||||
|
|
||||||
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
|
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -424,6 +425,7 @@ func TestNewConn(t *testing.T) {
|
|||||||
EndpointsFunc: epFunc,
|
EndpointsFunc: epFunc,
|
||||||
Logf: t.Logf,
|
Logf: t.Logf,
|
||||||
NetMon: netMon,
|
NetMon: netMon,
|
||||||
|
EventBus: bus,
|
||||||
Metrics: new(usermetric.Registry),
|
Metrics: new(usermetric.Registry),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -542,7 +544,7 @@ func TestDeviceStartStop(t *testing.T) {
|
|||||||
tstest.ResourceCheck(t)
|
tstest.ResourceCheck(t)
|
||||||
|
|
||||||
bus := eventbus.New()
|
bus := eventbus.New()
|
||||||
defer bus.Close()
|
t.Cleanup(bus.Close)
|
||||||
|
|
||||||
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
|
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -554,6 +556,7 @@ func TestDeviceStartStop(t *testing.T) {
|
|||||||
EndpointsFunc: func(eps []tailcfg.Endpoint) {},
|
EndpointsFunc: func(eps []tailcfg.Endpoint) {},
|
||||||
Logf: t.Logf,
|
Logf: t.Logf,
|
||||||
NetMon: netMon,
|
NetMon: netMon,
|
||||||
|
EventBus: bus,
|
||||||
Metrics: new(usermetric.Registry),
|
Metrics: new(usermetric.Registry),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1349,7 +1352,7 @@ func newTestConn(t testing.TB) *Conn {
|
|||||||
port := pickPort(t)
|
port := pickPort(t)
|
||||||
|
|
||||||
bus := eventbus.New()
|
bus := eventbus.New()
|
||||||
defer bus.Close()
|
t.Cleanup(bus.Close)
|
||||||
|
|
||||||
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
|
netMon, err := netmon.New(bus, logger.WithPrefix(t.Logf, "... netmon: "))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1359,6 +1362,7 @@ func newTestConn(t testing.TB) *Conn {
|
|||||||
|
|
||||||
conn, err := NewConn(Options{
|
conn, err := NewConn(Options{
|
||||||
NetMon: netMon,
|
NetMon: netMon,
|
||||||
|
EventBus: bus,
|
||||||
HealthTracker: new(health.Tracker),
|
HealthTracker: new(health.Tracker),
|
||||||
Metrics: new(usermetric.Registry),
|
Metrics: new(usermetric.Registry),
|
||||||
DisablePortMapper: true,
|
DisablePortMapper: true,
|
||||||
@ -3147,6 +3151,7 @@ func TestNetworkDownSendErrors(t *testing.T) {
|
|||||||
Logf: t.Logf,
|
Logf: t.Logf,
|
||||||
NetMon: netMon,
|
NetMon: netMon,
|
||||||
Metrics: reg,
|
Metrics: reg,
|
||||||
|
EventBus: bus,
|
||||||
}))
|
}))
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user