mirror of
https://github.com/tailscale/tailscale.git
synced 2025-11-15 02:05:16 +00:00
appc,*: publish events for route updates and storage (#17392)
Add and wire up event publishers for these two event types in the AppConnector. Nothing currently subscribes to them, so this is harmless. Subscribers for these events will be added in a near-future commit. As part of this, move the appc.RouteInfo type to the types/appctype package. It does not contain any package-specific details from appc. Beside it, add appctype.RouteUpdate to carry route update event state, likewise not specific to appc. Update all usage of the appc.* types throughout to use appctype.* instead, and update depaware files to reflect these changes. Add a Close method to the AppConnector to make sure the client gets cleaned up when the connector is dropped (we re-create connectors). Update the unit tests in the appc package to also check the events published alongside calls to the RouteAdvertiser. For now the tests still rely on the RouteAdvertiser for correctness; this is OK for now as the two methods are always performed together. In the near future, we need to rework the tests so not require that, but that will require building some more test fixtures that we can handle separately. Updates #15160 Updates #17192 Change-Id: I184670ba2fb920e0d2cb2be7c6816259bca77afe Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
This commit is contained in:
@@ -12,12 +12,14 @@ package appc
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"maps"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tailscale.com/types/appctype"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/types/views"
|
||||
"tailscale.com/util/clientmetric"
|
||||
@@ -114,19 +116,6 @@ func metricStoreRoutes(rate, nRoutes int64) {
|
||||
recordMetric(nRoutes, metricStoreRoutesNBuckets, metricStoreRoutesN)
|
||||
}
|
||||
|
||||
// RouteInfo is a data structure used to persist the in memory state of an AppConnector
|
||||
// so that we can know, even after a restart, which routes came from ACLs and which were
|
||||
// learned from domains.
|
||||
type RouteInfo struct {
|
||||
// Control is the routes from the 'routes' section of an app connector acl.
|
||||
Control []netip.Prefix `json:",omitempty"`
|
||||
// Domains are the routes discovered by observing DNS lookups for configured domains.
|
||||
Domains map[string][]netip.Addr `json:",omitempty"`
|
||||
// Wildcards are the configured DNS lookup domains to observe. When a DNS query matches Wildcards,
|
||||
// its result is added to Domains.
|
||||
Wildcards []string `json:",omitempty"`
|
||||
}
|
||||
|
||||
// AppConnector is an implementation of an AppConnector that performs
|
||||
// its function as a subsystem inside of a tailscale node. At the control plane
|
||||
// side App Connector routing is configured in terms of domains rather than IP
|
||||
@@ -141,9 +130,12 @@ type AppConnector struct {
|
||||
logf logger.Logf
|
||||
eventBus *eventbus.Bus
|
||||
routeAdvertiser RouteAdvertiser
|
||||
pubClient *eventbus.Client
|
||||
updatePub *eventbus.Publisher[appctype.RouteUpdate]
|
||||
storePub *eventbus.Publisher[appctype.RouteInfo]
|
||||
|
||||
// storeRoutesFunc will be called to persist routes if it is not nil.
|
||||
storeRoutesFunc func(*RouteInfo) error
|
||||
storeRoutesFunc func(*appctype.RouteInfo) error
|
||||
|
||||
// mu guards the fields that follow
|
||||
mu sync.Mutex
|
||||
@@ -181,11 +173,11 @@ type Config struct {
|
||||
|
||||
// RouteInfo, if non-nil, use used as the initial set of routes for the
|
||||
// connector. If nil, the connector starts empty.
|
||||
RouteInfo *RouteInfo
|
||||
RouteInfo *appctype.RouteInfo
|
||||
|
||||
// StoreRoutesFunc, if non-nil, is called when the connector's routes
|
||||
// change, to allow the routes to be persisted.
|
||||
StoreRoutesFunc func(*RouteInfo) error
|
||||
StoreRoutesFunc func(*appctype.RouteInfo) error
|
||||
}
|
||||
|
||||
// NewAppConnector creates a new AppConnector.
|
||||
@@ -198,10 +190,14 @@ func NewAppConnector(c Config) *AppConnector {
|
||||
case c.RouteAdvertiser == nil:
|
||||
panic("missing route advertiser")
|
||||
}
|
||||
ec := c.EventBus.Client("appc.AppConnector")
|
||||
|
||||
ac := &AppConnector{
|
||||
logf: logger.WithPrefix(c.Logf, "appc: "),
|
||||
eventBus: c.EventBus,
|
||||
pubClient: ec,
|
||||
updatePub: eventbus.Publish[appctype.RouteUpdate](ec),
|
||||
storePub: eventbus.Publish[appctype.RouteInfo](ec),
|
||||
routeAdvertiser: c.RouteAdvertiser,
|
||||
storeRoutesFunc: c.StoreRoutesFunc,
|
||||
}
|
||||
@@ -228,6 +224,14 @@ func (e *AppConnector) ShouldStoreRoutes() bool {
|
||||
|
||||
// storeRoutesLocked takes the current state of the AppConnector and persists it
|
||||
func (e *AppConnector) storeRoutesLocked() error {
|
||||
if e.storePub.ShouldPublish() {
|
||||
e.storePub.Publish(appctype.RouteInfo{
|
||||
// Clone here, as the subscriber will handle these outside our lock.
|
||||
Control: slices.Clone(e.controlRoutes),
|
||||
Domains: maps.Clone(e.domains),
|
||||
Wildcards: slices.Clone(e.wildcards),
|
||||
})
|
||||
}
|
||||
if !e.ShouldStoreRoutes() {
|
||||
return nil
|
||||
}
|
||||
@@ -240,7 +244,8 @@ func (e *AppConnector) storeRoutesLocked() error {
|
||||
e.writeRateMinute.update(numRoutes)
|
||||
e.writeRateDay.update(numRoutes)
|
||||
|
||||
return e.storeRoutesFunc(&RouteInfo{
|
||||
// TODO(creachdair): Remove this once it's delivered over the event bus.
|
||||
return e.storeRoutesFunc(&appctype.RouteInfo{
|
||||
Control: e.controlRoutes,
|
||||
Domains: e.domains,
|
||||
Wildcards: e.wildcards,
|
||||
@@ -283,6 +288,18 @@ func (e *AppConnector) Wait(ctx context.Context) {
|
||||
e.queue.Wait(ctx)
|
||||
}
|
||||
|
||||
// Close closes the connector and cleans up resources associated with it.
|
||||
// It is safe (and a noop) to call Close on nil.
|
||||
func (e *AppConnector) Close() {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
e.queue.Shutdown() // TODO(creachadair): Should we wait for it too?
|
||||
e.pubClient.Close()
|
||||
}
|
||||
|
||||
func (e *AppConnector) updateDomains(domains []string) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
@@ -323,11 +340,15 @@ func (e *AppConnector) updateDomains(domains []string) {
|
||||
toRemove = append(toRemove, netip.PrefixFrom(a, a.BitLen()))
|
||||
}
|
||||
}
|
||||
e.queue.Add(func() {
|
||||
if err := e.routeAdvertiser.UnadvertiseRoute(toRemove...); err != nil {
|
||||
e.logf("failed to unadvertise routes on domain removal: %v: %v: %v", slicesx.MapKeys(oldDomains), toRemove, err)
|
||||
}
|
||||
})
|
||||
|
||||
if len(toRemove) != 0 {
|
||||
e.queue.Add(func() {
|
||||
if err := e.routeAdvertiser.UnadvertiseRoute(toRemove...); err != nil {
|
||||
e.logf("failed to unadvertise routes on domain removal: %v: %v: %v", slicesx.MapKeys(oldDomains), toRemove, err)
|
||||
}
|
||||
})
|
||||
e.updatePub.Publish(appctype.RouteUpdate{Unadvertise: toRemove})
|
||||
}
|
||||
}
|
||||
|
||||
e.logf("handling domains: %v and wildcards: %v", slicesx.MapKeys(e.domains), e.wildcards)
|
||||
@@ -377,6 +398,10 @@ nextRoute:
|
||||
e.logf("failed to unadvertise routes: %v: %v", toRemove, err)
|
||||
}
|
||||
})
|
||||
e.updatePub.Publish(appctype.RouteUpdate{
|
||||
Advertise: routes,
|
||||
Unadvertise: toRemove,
|
||||
})
|
||||
|
||||
e.controlRoutes = routes
|
||||
if err := e.storeRoutesLocked(); err != nil {
|
||||
@@ -464,6 +489,7 @@ func (e *AppConnector) scheduleAdvertisement(domain string, routes ...netip.Pref
|
||||
e.logf("failed to advertise routes for %s: %v: %v", domain, routes, err)
|
||||
return
|
||||
}
|
||||
e.updatePub.Publish(appctype.RouteUpdate{Advertise: routes})
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
package appc
|
||||
|
||||
import (
|
||||
stdcmp "cmp"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"reflect"
|
||||
"slices"
|
||||
@@ -11,9 +13,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"golang.org/x/net/dns/dnsmessage"
|
||||
"tailscale.com/appc/appctest"
|
||||
"tailscale.com/tstest"
|
||||
"tailscale.com/types/appctype"
|
||||
"tailscale.com/util/clientmetric"
|
||||
"tailscale.com/util/eventbus/eventbustest"
|
||||
"tailscale.com/util/mak"
|
||||
@@ -21,7 +26,7 @@ import (
|
||||
"tailscale.com/util/slicesx"
|
||||
)
|
||||
|
||||
func fakeStoreRoutes(*RouteInfo) error { return nil }
|
||||
func fakeStoreRoutes(*appctype.RouteInfo) error { return nil }
|
||||
|
||||
func TestUpdateDomains(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
@@ -33,14 +38,15 @@ func TestUpdateDomains(t *testing.T) {
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: &appctest.RouteCollector{},
|
||||
RouteInfo: &RouteInfo{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: &appctest.RouteCollector{}})
|
||||
}
|
||||
a.UpdateDomains([]string{"example.com"})
|
||||
t.Cleanup(a.Close)
|
||||
|
||||
a.UpdateDomains([]string{"example.com"})
|
||||
a.Wait(ctx)
|
||||
if got, want := a.Domains().AsSlice(), []string{"example.com"}; !slices.Equal(got, want) {
|
||||
t.Errorf("got %v; want %v", got, want)
|
||||
@@ -68,6 +74,7 @@ func TestUpdateRoutes(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
bus := eventbustest.NewBus(t)
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
rc := &appctest.RouteCollector{}
|
||||
var a *AppConnector
|
||||
if shouldStore {
|
||||
@@ -75,11 +82,14 @@ func TestUpdateRoutes(t *testing.T) {
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &RouteInfo{}, StoreRoutesFunc: fakeStoreRoutes,
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
t.Cleanup(a.Close)
|
||||
|
||||
a.updateDomains([]string{"*.example.com"})
|
||||
|
||||
// This route should be collapsed into the range
|
||||
@@ -116,6 +126,20 @@ func TestUpdateRoutes(t *testing.T) {
|
||||
if !slices.EqualFunc(rc.RemovedRoutes(), wantRemoved, prefixEqual) {
|
||||
t.Fatalf("unexpected removed routes: %v", rc.RemovedRoutes())
|
||||
}
|
||||
|
||||
if err := eventbustest.Expect(w,
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("192.0.2.1/32")}),
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("192.0.0.1/32")}),
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
eqUpdate(appctype.RouteUpdate{
|
||||
Advertise: prefixes("192.0.0.1/32", "192.0.2.0/24"),
|
||||
Unadvertise: prefixes("192.0.2.1/32"),
|
||||
}),
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,6 +147,7 @@ func TestUpdateRoutesUnadvertisesContainedRoutes(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
bus := eventbustest.NewBus(t)
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
rc := &appctest.RouteCollector{}
|
||||
var a *AppConnector
|
||||
if shouldStore {
|
||||
@@ -130,12 +155,14 @@ func TestUpdateRoutesUnadvertisesContainedRoutes(t *testing.T) {
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &RouteInfo{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
t.Cleanup(a.Close)
|
||||
|
||||
mak.Set(&a.domains, "example.com", []netip.Addr{netip.MustParseAddr("192.0.2.1")})
|
||||
rc.SetRoutes([]netip.Prefix{netip.MustParsePrefix("192.0.2.1/32")})
|
||||
routes := []netip.Prefix{netip.MustParsePrefix("192.0.2.0/24")}
|
||||
@@ -145,12 +172,23 @@ func TestUpdateRoutesUnadvertisesContainedRoutes(t *testing.T) {
|
||||
if !slices.EqualFunc(routes, rc.Routes(), prefixEqual) {
|
||||
t.Fatalf("got %v, want %v", rc.Routes(), routes)
|
||||
}
|
||||
|
||||
if err := eventbustest.ExpectExactly(w,
|
||||
eqUpdate(appctype.RouteUpdate{
|
||||
Advertise: prefixes("192.0.2.0/24"),
|
||||
Unadvertise: prefixes("192.0.2.1/32"),
|
||||
}),
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDomainRoutes(t *testing.T) {
|
||||
bus := eventbustest.NewBus(t)
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
rc := &appctest.RouteCollector{}
|
||||
var a *AppConnector
|
||||
if shouldStore {
|
||||
@@ -158,12 +196,13 @@ func TestDomainRoutes(t *testing.T) {
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &RouteInfo{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
t.Cleanup(a.Close)
|
||||
a.updateDomains([]string{"example.com"})
|
||||
if err := a.ObserveDNSResponse(dnsResponse("example.com.", "192.0.0.8")); err != nil {
|
||||
t.Errorf("ObserveDNSResponse: %v", err)
|
||||
@@ -177,6 +216,13 @@ func TestDomainRoutes(t *testing.T) {
|
||||
if got := a.DomainRoutes(); !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("DomainRoutes: got %v, want %v", got, want)
|
||||
}
|
||||
|
||||
if err := eventbustest.ExpectExactly(w,
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("192.0.0.8/32")}),
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,6 +230,7 @@ func TestObserveDNSResponse(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
bus := eventbustest.NewBus(t)
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
rc := &appctest.RouteCollector{}
|
||||
var a *AppConnector
|
||||
if shouldStore {
|
||||
@@ -191,12 +238,13 @@ func TestObserveDNSResponse(t *testing.T) {
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &RouteInfo{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
t.Cleanup(a.Close)
|
||||
|
||||
// a has no domains configured, so it should not advertise any routes
|
||||
if err := a.ObserveDNSResponse(dnsResponse("example.com.", "192.0.0.8")); err != nil {
|
||||
@@ -273,6 +321,22 @@ func TestObserveDNSResponse(t *testing.T) {
|
||||
if !slices.Contains(a.domains["example.com"], netip.MustParseAddr("192.0.2.1")) {
|
||||
t.Errorf("missing %v from %v", "192.0.2.1", a.domains["exmaple.com"])
|
||||
}
|
||||
|
||||
if err := eventbustest.ExpectExactly(w,
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("192.0.0.8/32")}), // from initial DNS response, via example.com
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("192.0.0.9/32")}), // from CNAME response
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("192.0.0.10/32")}), // from CNAME response, mid-chain
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("2001:db8::1/128")}), // v6 DNS response
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("192.0.2.0/24")}), // additional prefix
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
// N.B. no update for 192.0.2.1 as it is already covered
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -280,6 +344,7 @@ func TestWildcardDomains(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
bus := eventbustest.NewBus(t)
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
rc := &appctest.RouteCollector{}
|
||||
var a *AppConnector
|
||||
if shouldStore {
|
||||
@@ -287,12 +352,13 @@ func TestWildcardDomains(t *testing.T) {
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &RouteInfo{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
t.Cleanup(a.Close)
|
||||
|
||||
a.updateDomains([]string{"*.example.com"})
|
||||
if err := a.ObserveDNSResponse(dnsResponse("foo.example.com.", "192.0.0.8")); err != nil {
|
||||
@@ -319,6 +385,13 @@ func TestWildcardDomains(t *testing.T) {
|
||||
if len(a.wildcards) != 1 {
|
||||
t.Errorf("expected only one wildcard domain, got %v", a.wildcards)
|
||||
}
|
||||
|
||||
if err := eventbustest.ExpectExactly(w,
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("192.0.0.8/32")}),
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -437,6 +510,7 @@ func TestUpdateRouteRouteRemoval(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
bus := eventbustest.NewBus(t)
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
rc := &appctest.RouteCollector{}
|
||||
|
||||
assertRoutes := func(prefix string, routes, removedRoutes []netip.Prefix) {
|
||||
@@ -454,12 +528,14 @@ func TestUpdateRouteRouteRemoval(t *testing.T) {
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &RouteInfo{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
t.Cleanup(a.Close)
|
||||
|
||||
// nothing has yet been advertised
|
||||
assertRoutes("appc init", []netip.Prefix{}, []netip.Prefix{})
|
||||
|
||||
@@ -482,6 +558,13 @@ func TestUpdateRouteRouteRemoval(t *testing.T) {
|
||||
wantRemovedRoutes = prefixes("1.2.3.2/32")
|
||||
}
|
||||
assertRoutes("removal", wantRoutes, wantRemovedRoutes)
|
||||
|
||||
if err := eventbustest.Expect(w,
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("1.2.3.1/32", "1.2.3.2/32")}), // no duplicates here
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -489,6 +572,7 @@ func TestUpdateDomainRouteRemoval(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
bus := eventbustest.NewBus(t)
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
rc := &appctest.RouteCollector{}
|
||||
|
||||
assertRoutes := func(prefix string, routes, removedRoutes []netip.Prefix) {
|
||||
@@ -506,12 +590,14 @@ func TestUpdateDomainRouteRemoval(t *testing.T) {
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &RouteInfo{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
t.Cleanup(a.Close)
|
||||
|
||||
assertRoutes("appc init", []netip.Prefix{}, []netip.Prefix{})
|
||||
|
||||
a.UpdateDomainsAndRoutes([]string{"a.example.com", "b.example.com"}, []netip.Prefix{})
|
||||
@@ -544,6 +630,22 @@ func TestUpdateDomainRouteRemoval(t *testing.T) {
|
||||
wantRemovedRoutes = prefixes("1.2.3.3/32", "1.2.3.4/32")
|
||||
}
|
||||
assertRoutes("removal", wantRoutes, wantRemovedRoutes)
|
||||
|
||||
wantEvents := []any{
|
||||
// Each DNS record observed triggers an update.
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("1.2.3.1/32")}),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("1.2.3.2/32")}),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("1.2.3.3/32")}),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("1.2.3.4/32")}),
|
||||
}
|
||||
if shouldStore {
|
||||
wantEvents = append(wantEvents, eqUpdate(appctype.RouteUpdate{
|
||||
Unadvertise: prefixes("1.2.3.3/32", "1.2.3.4/32"),
|
||||
}))
|
||||
}
|
||||
if err := eventbustest.Expect(w, wantEvents...); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -551,6 +653,7 @@ func TestUpdateWildcardRouteRemoval(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
bus := eventbustest.NewBus(t)
|
||||
for _, shouldStore := range []bool{false, true} {
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
rc := &appctest.RouteCollector{}
|
||||
|
||||
assertRoutes := func(prefix string, routes, removedRoutes []netip.Prefix) {
|
||||
@@ -568,12 +671,14 @@ func TestUpdateWildcardRouteRemoval(t *testing.T) {
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &RouteInfo{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
} else {
|
||||
a = NewAppConnector(Config{Logf: t.Logf, EventBus: bus, RouteAdvertiser: rc})
|
||||
}
|
||||
t.Cleanup(a.Close)
|
||||
|
||||
assertRoutes("appc init", []netip.Prefix{}, []netip.Prefix{})
|
||||
|
||||
a.UpdateDomainsAndRoutes([]string{"a.example.com", "*.b.example.com"}, []netip.Prefix{})
|
||||
@@ -606,6 +711,22 @@ func TestUpdateWildcardRouteRemoval(t *testing.T) {
|
||||
wantRemovedRoutes = prefixes("1.2.3.3/32", "1.2.3.4/32")
|
||||
}
|
||||
assertRoutes("removal", wantRoutes, wantRemovedRoutes)
|
||||
|
||||
wantEvents := []any{
|
||||
// Each DNS record observed triggers an update.
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("1.2.3.1/32")}),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("1.2.3.2/32")}),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("1.2.3.3/32")}),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("1.2.3.4/32")}),
|
||||
}
|
||||
if shouldStore {
|
||||
wantEvents = append(wantEvents, eqUpdate(appctype.RouteUpdate{
|
||||
Unadvertise: prefixes("1.2.3.3/32", "1.2.3.4/32"),
|
||||
}))
|
||||
}
|
||||
if err := eventbustest.Expect(w, wantEvents...); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -708,17 +829,23 @@ func TestMetricBucketsAreSorted(t *testing.T) {
|
||||
// routeAdvertiser, calls to Advertise/UnadvertiseRoutes can end up calling
|
||||
// back into AppConnector via authReconfig. If everything is called
|
||||
// synchronously, this results in a deadlock on AppConnector.mu.
|
||||
//
|
||||
// TODO(creachadair, 2025-09-18): Remove this along with the advertiser
|
||||
// interface once the LocalBackend is switched to use the event bus and the
|
||||
// tests have been updated not to need it.
|
||||
func TestUpdateRoutesDeadlock(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
bus := eventbustest.NewBus(t)
|
||||
w := eventbustest.NewWatcher(t, bus)
|
||||
rc := &appctest.RouteCollector{}
|
||||
a := NewAppConnector(Config{
|
||||
Logf: t.Logf,
|
||||
EventBus: bus,
|
||||
RouteAdvertiser: rc,
|
||||
RouteInfo: &RouteInfo{},
|
||||
RouteInfo: &appctype.RouteInfo{},
|
||||
StoreRoutesFunc: fakeStoreRoutes,
|
||||
})
|
||||
t.Cleanup(a.Close)
|
||||
|
||||
advertiseCalled := new(atomic.Bool)
|
||||
unadvertiseCalled := new(atomic.Bool)
|
||||
@@ -762,4 +889,42 @@ func TestUpdateRoutesDeadlock(t *testing.T) {
|
||||
if want := []netip.Prefix{netip.MustParsePrefix("127.0.0.1/32")}; !slices.Equal(slices.Compact(rc.Routes()), want) {
|
||||
t.Fatalf("got %v, want %v", rc.Routes(), want)
|
||||
}
|
||||
|
||||
if err := eventbustest.ExpectExactly(w,
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("127.0.0.1/32", "127.0.0.2/32")}),
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
eqUpdate(appctype.RouteUpdate{Advertise: prefixes("127.0.0.1/32"), Unadvertise: prefixes("127.0.0.2/32")}),
|
||||
eventbustest.Type[appctype.RouteInfo](),
|
||||
); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
type textUpdate struct {
|
||||
Advertise []string
|
||||
Unadvertise []string
|
||||
}
|
||||
|
||||
func routeUpdateToText(u appctype.RouteUpdate) textUpdate {
|
||||
var out textUpdate
|
||||
for _, p := range u.Advertise {
|
||||
out.Advertise = append(out.Advertise, p.String())
|
||||
}
|
||||
for _, p := range u.Unadvertise {
|
||||
out.Unadvertise = append(out.Unadvertise, p.String())
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// eqUpdate generates an eventbus test filter that matches a appctype.RouteUpdate
|
||||
// message equal to want, or reports an error giving a human-readable diff.
|
||||
func eqUpdate(want appctype.RouteUpdate) func(appctype.RouteUpdate) error {
|
||||
return func(got appctype.RouteUpdate) error {
|
||||
if diff := cmp.Diff(routeUpdateToText(got), routeUpdateToText(want),
|
||||
cmpopts.SortSlices(stdcmp.Less[string]),
|
||||
); diff != "" {
|
||||
return fmt.Errorf("wrong update (-got, +want):\n%s", diff)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user