This commit is contained in:
Fran Bull 2025-04-16 10:21:50 -07:00
parent 4cb9d5c183
commit 4f2e20cade
6 changed files with 550 additions and 10 deletions

View File

@ -0,0 +1,323 @@
package ippool
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/netip"
"sync"
"time"
"github.com/gaissmai/bart"
"github.com/hashicorp/raft"
"go4.org/netipx"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/tsconsensus"
"tailscale.com/tsnet"
"tailscale.com/util/mak"
)
/*
TODO(fran)
An ConsensusIPPool is a group of one or more IPV4 ranges from which individual IPV4 addresses can be
checked out.
natc-consensus provides per domain router functionality for a tailnet.
- when a node does a dns lookup for a domain the natc-consensus handles, natc-consensus asks ConsensusIPPool for an IP address
for that node and domain. When ConsensusIPPool
- when a node sends traffic to the IP address it has for a domain, natc-consensus asks ConsensusIPPool which domain that traffic
is for.
- when an IP address hasn't been used for a while ConsensusIPPool forgets about that node-ip-domain mapping and may provide
that IP address to that node in response to a subsequent DNS request.
The pool is distributed across servers in a cluster, to provide high availability.
Each tailcfg.NodeID has the full range available. The same IPV4 address will be provided to different nodes.
ConsensusIPPool will maintain the node-ip-domain mapping until it expires, and won't hand out the IP address to that node
again while it maintains the mapping.
Reading from the pool is fast, writing to the pool is slow. Because reads can be done in memory on the server that got
the traffic, but writes must be sent to the consensus peers.
To handle expiry we write on reads, to update the last-used-date, but we do that after we've returned a response.
# ConsensusIPPool.DomainForIP gets the domain associated with a previous IP checkout for a node
ConsensusIPPool.IPForDomain gets an IP address for the node+domain. It will return an IP address from any existing mapping,
or it may create a mapping with a new unused IP address.
*/
type ConsensusIPPool struct {
IPSet *netipx.IPSet
perPeerMap syncs.Map[tailcfg.NodeID, *consensusPerPeerState]
consensus commandExecutor
}
func (ipp *ConsensusIPPool) DomainForIP(from tailcfg.NodeID, addr netip.Addr, updatedAt time.Time) (string, bool) {
// TODO (fran) lock?
pm, ok := ipp.perPeerMap.Load(from)
if !ok {
log.Printf("DomainForIP: peer state absent for: %d", from)
return "", false
}
ww, ok := pm.AddrToDomain.Lookup(addr)
if !ok {
log.Printf("DomainForIP: peer state doesn't recognize domain")
return "", false
}
go func() {
err := ipp.markLastUsed(from, addr, ww.Domain, updatedAt)
if err != nil {
panic(err)
}
}()
return ww.Domain, true
}
type markLastUsedArgs struct {
NodeID tailcfg.NodeID
Addr netip.Addr
Domain string
UpdatedAt time.Time
}
func (ipp *ConsensusIPPool) executeMarkLastUsed(bs []byte) tsconsensus.CommandResult {
var args markLastUsedArgs
err := json.Unmarshal(bs, &args)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
err = ipp.applyMarkLastUsed(args.NodeID, args.Addr, args.Domain, args.UpdatedAt)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
return tsconsensus.CommandResult{}
}
func (ipp *ConsensusIPPool) applyMarkLastUsed(from tailcfg.NodeID, addr netip.Addr, domain string, updatedAt time.Time) error {
// TODO (fran) lock?
ps, ok := ipp.perPeerMap.Load(from)
if !ok {
// There's nothing to mark. But this is unexpected, because we mark last used after we do things with peer state.
log.Printf("applyMarkLastUsed: could not find peer state, nodeID: %s", from)
return nil
}
ww, ok := ps.AddrToDomain.Lookup(addr)
if !ok {
// The peer state didn't have an entry for the IP address (possibly it expired), so there's nothing to mark.
return nil
}
if ww.Domain != domain {
// The IP address expired and was reused for a new domain. Don't mark.
return nil
}
if ww.LastUsed.After(updatedAt) {
// This has been marked more recently. Don't mark.
return nil
}
ww.LastUsed = updatedAt
ps.AddrToDomain.Insert(netip.PrefixFrom(addr, addr.BitLen()), ww)
return nil
}
func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, clusterTag string) error {
cfg := tsconsensus.DefaultConfig()
cfg.ServeDebugMonitor = true
cns, err := tsconsensus.Start(ctx, ts, ipp, clusterTag, cfg)
if err != nil {
return err
}
ipp.consensus = cns
return nil
}
type whereWhen struct {
Domain string
LastUsed time.Time
}
type consensusPerPeerState struct {
DomainToAddr map[string]netip.Addr
AddrToDomain *bart.Table[whereWhen]
mu sync.Mutex // not jsonified
}
func (ipp *ConsensusIPPool) StopConsensus(ctx context.Context) error {
return (ipp.consensus).(*tsconsensus.Consensus).Stop(ctx)
}
// unusedIPV4 finds the next unused or expired IP address in the pool.
// IP addresses in the pool should be reused if they haven't been used for some period of time.
// reuseDeadline is the time before which addresses are considered to be expired.
// So if addresses are being reused after they haven't been used for 24 hours say, reuseDeadline
// would be 24 hours ago.
func (ps *consensusPerPeerState) unusedIPV4(ipset *netipx.IPSet, reuseDeadline time.Time) (netip.Addr, bool, string, error) {
// TODO (fran) here we iterate through each ip within the ranges until we find one that's unused or expired
// if we want to have a random ip choice behavior we could make that work with the state machine by doing something like
// passing the randomly chosen ip into the state machine call (so replaying logs would still be deterministic)
for _, r := range ipset.Ranges() {
ip := r.From()
toIP := r.To()
if !ip.IsValid() || !toIP.IsValid() {
continue
}
for toIP.Compare(ip) != -1 {
ww, ok := ps.AddrToDomain.Lookup(ip)
if !ok {
return ip, false, "", nil
}
if ww.LastUsed.Before(reuseDeadline) {
return ip, true, ww.Domain, nil
}
ip = ip.Next()
}
}
return netip.Addr{}, false, "", errors.New("ip pool exhausted")
}
func (ipp *ConsensusIPPool) IPForDomain(nid tailcfg.NodeID, domain string) (netip.Addr, error) {
now := time.Now()
args := checkoutAddrArgs{
NodeID: nid,
Domain: domain,
ReuseDeadline: now.Add(-48 * time.Hour), // TODO (fran) is this good? should it be configurable?
UpdatedAt: now,
}
bs, err := json.Marshal(args)
if err != nil {
return netip.Addr{}, err
}
c := tsconsensus.Command{
Name: "checkoutAddr",
Args: bs,
}
result, err := ipp.consensus.ExecuteCommand(c)
if err != nil {
log.Printf("IpForDomain: raft error executing command: %v", err)
return netip.Addr{}, err
}
if result.Err != nil {
log.Printf("IpForDomain: error returned from state machine: %v", err)
return netip.Addr{}, result.Err
}
var addr netip.Addr
err = json.Unmarshal(result.Result, &addr)
return addr, err
}
func (ipp *ConsensusIPPool) markLastUsed(nid tailcfg.NodeID, addr netip.Addr, domain string, lastUsed time.Time) error {
args := markLastUsedArgs{
NodeID: nid,
Addr: addr,
Domain: domain,
UpdatedAt: lastUsed,
}
bs, err := json.Marshal(args)
if err != nil {
return err
}
c := tsconsensus.Command{
Name: "markLastUsed",
Args: bs,
}
result, err := ipp.consensus.ExecuteCommand(c)
if err != nil {
log.Printf("markLastUsed: raft error executing command: %v", err)
return err
}
if result.Err != nil {
log.Printf("markLastUsed: error returned from state machine: %v", err)
return result.Err
}
return nil
}
type checkoutAddrArgs struct {
NodeID tailcfg.NodeID
Domain string
ReuseDeadline time.Time
UpdatedAt time.Time
}
func (ipp *ConsensusIPPool) executeCheckoutAddr(bs []byte) tsconsensus.CommandResult {
var args checkoutAddrArgs
err := json.Unmarshal(bs, &args)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
addr, err := ipp.applyCheckoutAddr(args.NodeID, args.Domain, args.ReuseDeadline, args.UpdatedAt)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
resultBs, err := json.Marshal(addr)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
return tsconsensus.CommandResult{Result: resultBs}
}
// applyCheckoutAddr finds the IP address for a nid+domain
// Each nid can use all of the addresses in the pool.
// updatedAt is the current time, the time at which we are wanting to get a new IP address.
// reuseDeadline is the time before which addresses are considered to be expired.
// So if addresses are being reused after they haven't been used for 24 hours say updatedAt would be now
// and reuseDeadline would be 24 hours ago.
func (ipp *ConsensusIPPool) applyCheckoutAddr(nid tailcfg.NodeID, domain string, reuseDeadline, updatedAt time.Time) (netip.Addr, error) {
// TODO (fran) lock and unlock (we need to right?)
pm, _ := ipp.perPeerMap.LoadOrStore(nid, &consensusPerPeerState{
AddrToDomain: &bart.Table[whereWhen]{},
})
if existing, ok := pm.DomainToAddr[domain]; ok {
// TODO (fran) handle error case where this doesn't exist
ww, _ := pm.AddrToDomain.Lookup(existing)
ww.LastUsed = updatedAt
pm.AddrToDomain.Insert(netip.PrefixFrom(existing, existing.BitLen()), ww)
return existing, nil
}
addr, wasInUse, previousDomain, err := pm.unusedIPV4(ipp.IPSet, reuseDeadline)
if err != nil {
return netip.Addr{}, err
}
mak.Set(&pm.DomainToAddr, domain, addr)
if wasInUse {
delete(pm.DomainToAddr, previousDomain)
}
pm.AddrToDomain.Insert(netip.PrefixFrom(addr, addr.BitLen()), whereWhen{Domain: domain, LastUsed: updatedAt})
return addr, nil
}
// fulfil the raft lib functional state machine interface
func (ipp *ConsensusIPPool) Apply(l *raft.Log) interface{} {
var c tsconsensus.Command
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
}
switch c.Name {
case "checkoutAddr":
return ipp.executeCheckoutAddr(c.Args)
case "markLastUsed":
return ipp.executeMarkLastUsed(c.Args)
default:
panic(fmt.Sprintf("unrecognized command: %s", c.Name))
}
}
// TODO(fran) what exactly would we gain by implementing Snapshot and Restore?
func (ipp *ConsensusIPPool) Snapshot() (raft.FSMSnapshot, error) {
return nil, nil
}
func (ipp *ConsensusIPPool) Restore(rc io.ReadCloser) error {
return nil
}
// commandExecutor is an interface covering the routing parts of consensus
// used to allow a fake in the tests
type commandExecutor interface {
ExecuteCommand(tsconsensus.Command) (tsconsensus.CommandResult, error)
}

View File

@ -0,0 +1,185 @@
package ippool
import (
"encoding/json"
"fmt"
"net/netip"
"testing"
"time"
"github.com/hashicorp/raft"
"go4.org/netipx"
"tailscale.com/tailcfg"
"tailscale.com/tsconsensus"
"tailscale.com/util/must"
)
func makeSetFromPrefix(pfx netip.Prefix) *netipx.IPSet {
var ipsb netipx.IPSetBuilder
ipsb.AddPrefix(pfx)
return must.Get(ipsb.IPSet())
}
type FakeConsensus struct {
ipp *ConsensusIPPool
}
func (c *FakeConsensus) ExecuteCommand(cmd tsconsensus.Command) (tsconsensus.CommandResult, error) {
b, err := json.Marshal(cmd)
if err != nil {
return tsconsensus.CommandResult{}, err
}
result := c.ipp.Apply(&raft.Log{Data: b})
return result.(tsconsensus.CommandResult), nil
}
func makePool(pfx netip.Prefix) *ConsensusIPPool {
ipp := &ConsensusIPPool{
IPSet: makeSetFromPrefix(pfx),
}
ipp.consensus = &FakeConsensus{ipp: ipp}
return ipp
}
func TestConsensusIPForDomain(t *testing.T) {
pfx := netip.MustParsePrefix("100.64.0.0/16")
ipp := makePool(pfx)
from := tailcfg.NodeID(1)
a, err := ipp.IPForDomain(from, "example.com")
if err != nil {
t.Fatal(err)
}
if !pfx.Contains(a) {
t.Fatalf("expected %v to be in the prefix %v", a, pfx)
}
b, err := ipp.IPForDomain(from, "a.example.com")
if err != nil {
t.Fatal(err)
}
if !pfx.Contains(b) {
t.Fatalf("expected %v to be in the prefix %v", b, pfx)
}
if b == a {
t.Fatalf("same address issued twice %v, %v", a, b)
}
c, err := ipp.IPForDomain(from, "example.com")
if err != nil {
t.Fatal(err)
}
if c != a {
t.Fatalf("expected %v to be remembered as the addr for example.com, but got %v", a, c)
}
}
func TestConsensusPoolExhaustion(t *testing.T) {
ipp := makePool(netip.MustParsePrefix("100.64.0.0/31"))
from := tailcfg.NodeID(1)
subdomains := []string{"a", "b", "c"}
for i, sd := range subdomains {
_, err := ipp.IPForDomain(from, fmt.Sprintf("%s.example.com", sd))
if i < 2 && err != nil {
t.Fatal(err)
}
expected := "ip pool exhausted"
if i == 2 && err.Error() != expected {
t.Fatalf("expected error to be '%s', got '%s'", expected, err.Error())
}
}
}
func TestConsensusPoolExpiry(t *testing.T) {
ipp := makePool(netip.MustParsePrefix("100.64.0.0/31"))
firstIP := netip.MustParseAddr("100.64.0.0")
secondIP := netip.MustParseAddr("100.64.0.1")
timeOfUse := time.Now()
beforeTimeOfUse := timeOfUse.Add(-2 * time.Hour)
afterTimeOfUse := timeOfUse.Add(2 * time.Hour)
from := tailcfg.NodeID(1)
// the pool is unused, we get an address, and it's marked as being used at timeOfUse
aAddr, err := ipp.applyCheckoutAddr(from, "a.example.com", timeOfUse, timeOfUse)
if err != nil {
t.Fatal(err)
}
if aAddr.Compare(firstIP) != 0 {
t.Fatalf("expected %s, got %s", firstIP, aAddr)
}
d, ok := ipp.DomainForIP(from, firstIP, timeOfUse)
if !ok {
t.Fatal("expected addr to be found")
}
if d != "a.example.com" {
t.Fatalf("expected aAddr to look up to a.example.com, got: %s", d)
}
// the time before which we will reuse addresses is prior to timeOfUse, so no reuse
bAddr, err := ipp.applyCheckoutAddr(from, "b.example.com", beforeTimeOfUse, timeOfUse)
if err != nil {
t.Fatal(err)
}
if bAddr.Compare(secondIP) != 0 {
t.Fatalf("expected %s, got %s", secondIP, bAddr)
}
// the time before which we will reuse addresses is after timeOfUse, so reuse addresses that were marked as used at timeOfUse.
cAddr, err := ipp.applyCheckoutAddr(from, "c.example.com", afterTimeOfUse, timeOfUse)
if err != nil {
t.Fatal(err)
}
if cAddr.Compare(firstIP) != 0 {
t.Fatalf("expected %s, got %s", firstIP, cAddr)
}
d, ok = ipp.DomainForIP(from, firstIP, timeOfUse)
if !ok {
t.Fatal("expected addr to be found")
}
if d != "c.example.com" {
t.Fatalf("expected firstIP to look up to c.example.com, got: %s", d)
}
// the addr remains associated with c.example.com
cAddrAgain, err := ipp.applyCheckoutAddr(from, "c.example.com", beforeTimeOfUse, timeOfUse)
if err != nil {
t.Fatal(err)
}
if cAddrAgain.Compare(cAddr) != 0 {
t.Fatalf("expected cAddrAgain to be cAddr, but they are different. cAddrAgain=%s cAddr=%s", cAddrAgain, cAddr)
}
d, ok = ipp.DomainForIP(from, firstIP, timeOfUse)
if !ok {
t.Fatal("expected addr to be found")
}
if d != "c.example.com" {
t.Fatalf("expected firstIP to look up to c.example.com, got: %s", d)
}
}
func TestConsensusDomainForIP(t *testing.T) {
ipp := makePool(netip.MustParsePrefix("100.64.0.0/16"))
from := tailcfg.NodeID(1)
domain := "example.com"
now := time.Now()
d, ok := ipp.DomainForIP(from, netip.MustParseAddr("100.64.0.1"), now)
if d != "" {
t.Fatalf("expected an empty string if the addr is not found but got %s", d)
}
if ok {
t.Fatalf("expected domain to not be found for IP, as it has never been looked up")
}
a, err := ipp.IPForDomain(from, domain)
if err != nil {
t.Fatal(err)
}
d2, ok := ipp.DomainForIP(from, a, now)
if d2 != domain {
t.Fatalf("expected %s but got %s", domain, d2)
}
if !ok {
t.Fatalf("expected domain to be found for IP that was handed out for it")
}
}

View File

@ -5,28 +5,44 @@
package ippool
import (
"context"
"errors"
"log"
"math/big"
"net/netip"
"sync"
"time"
"github.com/gaissmai/bart"
"go4.org/netipx"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/tsnet"
"tailscale.com/util/dnsname"
"tailscale.com/util/mak"
)
var ErrNoIPsAvailable = errors.New("no IPs available")
type IPPool struct {
type IPPool interface {
StartConsensus(context.Context, *tsnet.Server, string) error
StopConsensus(context.Context) error
DomainForIP(tailcfg.NodeID, netip.Addr, time.Time) (string, bool)
IPForDomain(tailcfg.NodeID, string) (netip.Addr, error)
}
type SingleMachineIPPool struct {
perPeerMap syncs.Map[tailcfg.NodeID, *perPeerState]
IPSet *netipx.IPSet
}
func (ipp *IPPool) DomainForIP(from tailcfg.NodeID, addr netip.Addr) (string, bool) {
func (*SingleMachineIPPool) StartConsensus(context.Context, *tsnet.Server, string) error {
return nil
}
func (*SingleMachineIPPool) StopConsensus(context.Context) error {
return nil
}
func (ipp *SingleMachineIPPool) DomainForIP(from tailcfg.NodeID, addr netip.Addr, t time.Time) (string, bool) {
ps, ok := ipp.perPeerMap.Load(from)
if !ok {
log.Printf("handleTCPFlow: no perPeerState for %v", from)
@ -40,7 +56,7 @@ func (ipp *IPPool) DomainForIP(from tailcfg.NodeID, addr netip.Addr) (string, bo
return domain, ok
}
func (ipp *IPPool) IPForDomain(from tailcfg.NodeID, domain string) (netip.Addr, error) {
func (ipp *SingleMachineIPPool) IPForDomain(from tailcfg.NodeID, domain string) (netip.Addr, error) {
npps := &perPeerState{
ipset: ipp.IPSet,
}

View File

@ -8,6 +8,7 @@ import (
"fmt"
"net/netip"
"testing"
"time"
"go4.org/netipx"
"tailscale.com/tailcfg"
@ -19,7 +20,7 @@ func TestIPPoolExhaustion(t *testing.T) {
var ipsb netipx.IPSetBuilder
ipsb.AddPrefix(smallPrefix)
addrPool := must.Get(ipsb.IPSet())
pool := IPPool{IPSet: addrPool}
pool := SingleMachineIPPool{IPSet: addrPool}
assignedIPs := make(map[netip.Addr]string)
@ -68,7 +69,7 @@ func TestIPPool(t *testing.T) {
var ipsb netipx.IPSetBuilder
ipsb.AddPrefix(netip.MustParsePrefix("100.64.1.0/24"))
addrPool := must.Get(ipsb.IPSet())
pool := IPPool{
pool := SingleMachineIPPool{
IPSet: addrPool,
}
from := tailcfg.NodeID(12345)
@ -89,7 +90,7 @@ func TestIPPool(t *testing.T) {
t.Errorf("IPv4 address %s not in range %s", addr, addrPool)
}
domain, ok := pool.DomainForIP(from, addr)
domain, ok := pool.DomainForIP(from, addr, time.Now())
if !ok {
t.Errorf("domainForIP(%s) not found", addr)
} else if domain != "example.com" {

View File

@ -57,6 +57,7 @@ func main() {
printULA = fs.Bool("print-ula", false, "print the ULA prefix and exit")
ignoreDstPfxStr = fs.String("ignore-destinations", "", "comma-separated list of prefixes to ignore")
wgPort = fs.Uint("wg-port", 0, "udp port for wireguard and peer to peer traffic")
clusterTag = fs.String("cluster-tag", "", "optionally run in a consensus cluster with other nodes with this tag")
)
ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("TS_NATC"))
@ -94,6 +95,7 @@ func main() {
ts := &tsnet.Server{
Hostname: *hostname,
}
ts.ControlURL = "http://host.docker.internal:31544" // TODO fran
if *wgPort != 0 {
if *wgPort >= 1<<16 {
log.Fatalf("wg-port must be in the range [0, 65535]")
@ -148,12 +150,25 @@ func main() {
routes, dnsAddr, addrPool := calculateAddresses(prefixes)
v6ULA := ula(uint16(*siteID))
var ipp ippool.IPPool
if *clusterTag != "" {
ipp = &ippool.ConsensusIPPool{IPSet: addrPool}
err = ipp.StartConsensus(ctx, ts, *clusterTag)
if err != nil {
log.Fatalf("StartConsensus: %v", err)
}
defer ipp.StopConsensus(ctx)
} else {
ipp = &ippool.SingleMachineIPPool{IPSet: addrPool}
}
c := &connector{
ts: ts,
whois: lc,
v6ULA: v6ULA,
ignoreDsts: ignoreDstTable,
ipPool: &ippool.IPPool{IPSet: addrPool},
ipPool: ipp,
routes: routes,
dnsAddr: dnsAddr,
resolver: net.DefaultResolver,
@ -209,7 +224,7 @@ type connector struct {
ignoreDsts *bart.Table[bool]
// ipPool contains the per-peer IPv4 address assignments.
ipPool *ippool.IPPool
ipPool ippool.IPPool
// resolver is used to lookup IP addresses for DNS queries.
resolver lookupNetIPer
@ -453,7 +468,7 @@ func (c *connector) handleTCPFlow(src, dst netip.AddrPort) (handler func(net.Con
if dstAddr.Is6() {
dstAddr = v4ForV6(dstAddr)
}
domain, ok := c.ipPool.DomainForIP(who.Node.ID, dstAddr)
domain, ok := c.ipPool.DomainForIP(who.Node.ID, dstAddr, time.Now())
if !ok {
return nil, false
}

View File

@ -270,7 +270,7 @@ func TestDNSResponse(t *testing.T) {
ignoreDsts: &bart.Table[bool]{},
routes: routes,
v6ULA: v6ULA,
ipPool: &ippool.IPPool{IPSet: addrPool},
ipPool: &ippool.SingleMachineIPPool{IPSet: addrPool},
dnsAddr: dnsAddr,
}
c.ignoreDsts.Insert(netip.MustParsePrefix("8.8.4.4/32"), true)