mirror of
https://github.com/tailscale/tailscale.git
synced 2025-07-31 16:23:44 +00:00

Currently consensus has a bootstrap routine where a tsnet node tries to join each other node with the cluster tag, and if it is not able to join any other node it starts its own cluster. That algorithm is racy, and can result in split brain (more than one leader/cluster) if all the nodes for a cluster are started at the same time. Add a FollowAddr argument to the bootstrap function. If provided this tsnet node will never lead, it will try (and retry with exponential back off) to follow the node at the provided address. Add a --follow flag to cmd/natc that uses this new tsconsensus functionality. Also slightly reorganize some arguments into opts structs. Updates #14667 Signed-off-by: Fran Bull <fran@tailscale.com>
445 lines
14 KiB
Go
445 lines
14 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package ippool
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net/netip"
|
|
"time"
|
|
|
|
"github.com/hashicorp/raft"
|
|
"go4.org/netipx"
|
|
"tailscale.com/syncs"
|
|
"tailscale.com/tailcfg"
|
|
"tailscale.com/tsconsensus"
|
|
"tailscale.com/tsnet"
|
|
"tailscale.com/util/mak"
|
|
)
|
|
|
|
// ConsensusIPPool implements an [IPPool] that is distributed among members of a cluster for high availability.
|
|
// Writes are directed to a leader among the cluster and are slower than reads, reads are performed locally
|
|
// using information replicated from the leader.
|
|
// The cluster maintains consistency, reads can be stale and writes can be unavailable if sufficient cluster
|
|
// peers are unavailable.
|
|
type ConsensusIPPool struct {
|
|
IPSet *netipx.IPSet
|
|
perPeerMap *syncs.Map[tailcfg.NodeID, *consensusPerPeerState]
|
|
consensus commandExecutor
|
|
unusedAddressLifetime time.Duration
|
|
}
|
|
|
|
func NewConsensusIPPool(ipSet *netipx.IPSet) *ConsensusIPPool {
|
|
return &ConsensusIPPool{
|
|
unusedAddressLifetime: 48 * time.Hour, // TODO (fran) is this appropriate? should it be configurable?
|
|
IPSet: ipSet,
|
|
perPeerMap: &syncs.Map[tailcfg.NodeID, *consensusPerPeerState]{},
|
|
}
|
|
}
|
|
|
|
// IPForDomain looks up or creates an IP address allocation for the tailcfg.NodeID and domain pair.
|
|
// If no address association is found, one is allocated from the range of free addresses for this tailcfg.NodeID.
|
|
// If no more address are available, an error is returned.
|
|
func (ipp *ConsensusIPPool) IPForDomain(nid tailcfg.NodeID, domain string) (netip.Addr, error) {
|
|
now := time.Now()
|
|
// Check local state; local state may be stale. If we have an IP for this domain, and we are not
|
|
// close to the expiry time for the domain, it's safe to return what we have.
|
|
ps, psFound := ipp.perPeerMap.Load(nid)
|
|
if psFound {
|
|
if addr, addrFound := ps.domainToAddr[domain]; addrFound {
|
|
if ww, wwFound := ps.addrToDomain.Load(addr); wwFound {
|
|
if !isCloseToExpiry(ww.LastUsed, now, ipp.unusedAddressLifetime) {
|
|
ipp.fireAndForgetMarkLastUsed(nid, addr, ww, now)
|
|
return addr, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// go via consensus
|
|
args := checkoutAddrArgs{
|
|
NodeID: nid,
|
|
Domain: domain,
|
|
ReuseDeadline: now.Add(-1 * ipp.unusedAddressLifetime),
|
|
UpdatedAt: now,
|
|
}
|
|
bs, err := json.Marshal(args)
|
|
if err != nil {
|
|
return netip.Addr{}, err
|
|
}
|
|
c := tsconsensus.Command{
|
|
Name: "checkoutAddr",
|
|
Args: bs,
|
|
}
|
|
result, err := ipp.consensus.ExecuteCommand(c)
|
|
if err != nil {
|
|
log.Printf("IPForDomain: raft error executing command: %v", err)
|
|
return netip.Addr{}, err
|
|
}
|
|
if result.Err != nil {
|
|
log.Printf("IPForDomain: error returned from state machine: %v", err)
|
|
return netip.Addr{}, result.Err
|
|
}
|
|
var addr netip.Addr
|
|
err = json.Unmarshal(result.Result, &addr)
|
|
return addr, err
|
|
}
|
|
|
|
// DomainForIP looks up the domain associated with a tailcfg.NodeID and netip.Addr pair.
|
|
// If there is no association, the result is empty and ok is false.
|
|
func (ipp *ConsensusIPPool) DomainForIP(from tailcfg.NodeID, addr netip.Addr, updatedAt time.Time) (string, bool) {
|
|
// Look in local state, to save a consensus round trip; local state may be stale.
|
|
//
|
|
// The only time we expect ordering of commands to matter to clients is on first
|
|
// connection to a domain. In that case it may be that although we don't find the
|
|
// domain in our local state, it is in fact in the state of the state machine (ie
|
|
// the client did a DNS lookup, and we responded with an IP and _should_ know that
|
|
// domain when the TCP connection for that IP arrives.)
|
|
//
|
|
// So it's ok to return local state, unless local state doesn't recognize the domain,
|
|
// in which case we should check the consensus state machine to know for sure.
|
|
var domain string
|
|
ww, ok := ipp.domainLookup(from, addr)
|
|
if ok {
|
|
domain = ww.Domain
|
|
} else {
|
|
d, err := ipp.readDomainForIP(from, addr)
|
|
if err != nil {
|
|
log.Printf("error reading domain from consensus: %v", err)
|
|
return "", false
|
|
}
|
|
domain = d
|
|
}
|
|
if domain == "" {
|
|
log.Printf("did not find domain for node: %v, addr: %s", from, addr)
|
|
return "", false
|
|
}
|
|
ipp.fireAndForgetMarkLastUsed(from, addr, ww, updatedAt)
|
|
return domain, true
|
|
}
|
|
|
|
func (ipp *ConsensusIPPool) fireAndForgetMarkLastUsed(from tailcfg.NodeID, addr netip.Addr, ww whereWhen, updatedAt time.Time) {
|
|
window := 5 * time.Minute
|
|
if updatedAt.Sub(ww.LastUsed).Abs() < window {
|
|
return
|
|
}
|
|
go func() {
|
|
err := ipp.markLastUsed(from, addr, ww.Domain, updatedAt)
|
|
if err != nil {
|
|
log.Printf("error marking last used: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (ipp *ConsensusIPPool) domainLookup(from tailcfg.NodeID, addr netip.Addr) (whereWhen, bool) {
|
|
ps, ok := ipp.perPeerMap.Load(from)
|
|
if !ok {
|
|
log.Printf("domainLookup: peer state absent for: %d", from)
|
|
return whereWhen{}, false
|
|
}
|
|
ww, ok := ps.addrToDomain.Load(addr)
|
|
if !ok {
|
|
log.Printf("domainLookup: peer state doesn't recognize addr: %s", addr)
|
|
return whereWhen{}, false
|
|
}
|
|
return ww, true
|
|
}
|
|
|
|
type ClusterOpts struct {
|
|
Tag string
|
|
StateDir string
|
|
Follow string
|
|
}
|
|
|
|
// StartConsensus is part of the IPPool interface. It starts the raft background routines that handle consensus.
|
|
func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, opts ClusterOpts) error {
|
|
cfg := tsconsensus.DefaultConfig()
|
|
cfg.ServeDebugMonitor = true
|
|
cfg.StateDirPath = opts.StateDir
|
|
cns, err := tsconsensus.Start(ctx, ts, ipp, tsconsensus.BootstrapOpts{
|
|
Tag: opts.Tag,
|
|
FollowAddr: opts.Follow,
|
|
}, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ipp.consensus = cns
|
|
return nil
|
|
}
|
|
|
|
type whereWhen struct {
|
|
Domain string
|
|
LastUsed time.Time
|
|
}
|
|
|
|
type consensusPerPeerState struct {
|
|
domainToAddr map[string]netip.Addr
|
|
addrToDomain *syncs.Map[netip.Addr, whereWhen]
|
|
}
|
|
|
|
// StopConsensus is part of the IPPool interface. It stops the raft background routines that handle consensus.
|
|
func (ipp *ConsensusIPPool) StopConsensus(ctx context.Context) error {
|
|
return (ipp.consensus).(*tsconsensus.Consensus).Stop(ctx)
|
|
}
|
|
|
|
// unusedIPV4 finds the next unused or expired IP address in the pool.
|
|
// IP addresses in the pool should be reused if they haven't been used for some period of time.
|
|
// reuseDeadline is the time before which addresses are considered to be expired.
|
|
// So if addresses are being reused after they haven't been used for 24 hours say, reuseDeadline
|
|
// would be 24 hours ago.
|
|
func (ps *consensusPerPeerState) unusedIPV4(ipset *netipx.IPSet, reuseDeadline time.Time) (netip.Addr, bool, string, error) {
|
|
// If we want to have a random IP choice behavior we could make that work with the state machine by doing something like
|
|
// passing the randomly chosen IP into the state machine call (so replaying logs would still be deterministic).
|
|
for _, r := range ipset.Ranges() {
|
|
ip := r.From()
|
|
toIP := r.To()
|
|
if !ip.IsValid() || !toIP.IsValid() {
|
|
continue
|
|
}
|
|
for toIP.Compare(ip) != -1 {
|
|
ww, ok := ps.addrToDomain.Load(ip)
|
|
if !ok {
|
|
return ip, false, "", nil
|
|
}
|
|
if ww.LastUsed.Before(reuseDeadline) {
|
|
return ip, true, ww.Domain, nil
|
|
}
|
|
ip = ip.Next()
|
|
}
|
|
}
|
|
return netip.Addr{}, false, "", errors.New("ip pool exhausted")
|
|
}
|
|
|
|
// isCloseToExpiry returns true if the lastUsed and now times are more than
|
|
// half the lifetime apart
|
|
func isCloseToExpiry(lastUsed, now time.Time, lifetime time.Duration) bool {
|
|
return now.Sub(lastUsed).Abs() > (lifetime / 2)
|
|
}
|
|
|
|
type readDomainForIPArgs struct {
|
|
NodeID tailcfg.NodeID
|
|
Addr netip.Addr
|
|
}
|
|
|
|
// executeReadDomainForIP parses a readDomainForIP log entry and applies it.
|
|
func (ipp *ConsensusIPPool) executeReadDomainForIP(bs []byte) tsconsensus.CommandResult {
|
|
var args readDomainForIPArgs
|
|
err := json.Unmarshal(bs, &args)
|
|
if err != nil {
|
|
return tsconsensus.CommandResult{Err: err}
|
|
}
|
|
return ipp.applyReadDomainForIP(args.NodeID, args.Addr)
|
|
}
|
|
|
|
func (ipp *ConsensusIPPool) applyReadDomainForIP(from tailcfg.NodeID, addr netip.Addr) tsconsensus.CommandResult {
|
|
domain := func() string {
|
|
ps, ok := ipp.perPeerMap.Load(from)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
ww, ok := ps.addrToDomain.Load(addr)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return ww.Domain
|
|
}()
|
|
resultBs, err := json.Marshal(domain)
|
|
return tsconsensus.CommandResult{Result: resultBs, Err: err}
|
|
}
|
|
|
|
// readDomainForIP executes a readDomainForIP command on the leader with raft.
|
|
func (ipp *ConsensusIPPool) readDomainForIP(nid tailcfg.NodeID, addr netip.Addr) (string, error) {
|
|
args := readDomainForIPArgs{
|
|
NodeID: nid,
|
|
Addr: addr,
|
|
}
|
|
bs, err := json.Marshal(args)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
c := tsconsensus.Command{
|
|
Name: "readDomainForIP",
|
|
Args: bs,
|
|
}
|
|
result, err := ipp.consensus.ExecuteCommand(c)
|
|
if err != nil {
|
|
log.Printf("readDomainForIP: raft error executing command: %v", err)
|
|
return "", err
|
|
}
|
|
if result.Err != nil {
|
|
log.Printf("readDomainForIP: error returned from state machine: %v", err)
|
|
return "", result.Err
|
|
}
|
|
var domain string
|
|
err = json.Unmarshal(result.Result, &domain)
|
|
return domain, err
|
|
}
|
|
|
|
type markLastUsedArgs struct {
|
|
NodeID tailcfg.NodeID
|
|
Addr netip.Addr
|
|
Domain string
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
// executeMarkLastUsed parses a markLastUsed log entry and applies it.
|
|
func (ipp *ConsensusIPPool) executeMarkLastUsed(bs []byte) tsconsensus.CommandResult {
|
|
var args markLastUsedArgs
|
|
err := json.Unmarshal(bs, &args)
|
|
if err != nil {
|
|
return tsconsensus.CommandResult{Err: err}
|
|
}
|
|
err = ipp.applyMarkLastUsed(args.NodeID, args.Addr, args.Domain, args.UpdatedAt)
|
|
if err != nil {
|
|
return tsconsensus.CommandResult{Err: err}
|
|
}
|
|
return tsconsensus.CommandResult{}
|
|
}
|
|
|
|
// applyMarkLastUsed applies the arguments from the log entry to the state. It updates an entry in the AddrToDomain
|
|
// map with a new LastUsed timestamp.
|
|
// applyMarkLastUsed is not safe for concurrent access. It's only called from raft which will
|
|
// not call it concurrently.
|
|
func (ipp *ConsensusIPPool) applyMarkLastUsed(from tailcfg.NodeID, addr netip.Addr, domain string, updatedAt time.Time) error {
|
|
ps, ok := ipp.perPeerMap.Load(from)
|
|
if !ok {
|
|
// There's nothing to mark. But this is unexpected, because we mark last used after we do things with peer state.
|
|
log.Printf("applyMarkLastUsed: could not find peer state, nodeID: %s", from)
|
|
return nil
|
|
}
|
|
ww, ok := ps.addrToDomain.Load(addr)
|
|
if !ok {
|
|
// The peer state didn't have an entry for the IP address (possibly it expired), so there's nothing to mark.
|
|
return nil
|
|
}
|
|
if ww.Domain != domain {
|
|
// The IP address expired and was reused for a new domain. Don't mark.
|
|
return nil
|
|
}
|
|
if ww.LastUsed.After(updatedAt) {
|
|
// This has been marked more recently. Don't mark.
|
|
return nil
|
|
}
|
|
ww.LastUsed = updatedAt
|
|
ps.addrToDomain.Store(addr, ww)
|
|
return nil
|
|
}
|
|
|
|
// markLastUsed executes a markLastUsed command on the leader with raft.
|
|
func (ipp *ConsensusIPPool) markLastUsed(nid tailcfg.NodeID, addr netip.Addr, domain string, lastUsed time.Time) error {
|
|
args := markLastUsedArgs{
|
|
NodeID: nid,
|
|
Addr: addr,
|
|
Domain: domain,
|
|
UpdatedAt: lastUsed,
|
|
}
|
|
bs, err := json.Marshal(args)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c := tsconsensus.Command{
|
|
Name: "markLastUsed",
|
|
Args: bs,
|
|
}
|
|
result, err := ipp.consensus.ExecuteCommand(c)
|
|
if err != nil {
|
|
log.Printf("markLastUsed: raft error executing command: %v", err)
|
|
return err
|
|
}
|
|
if result.Err != nil {
|
|
log.Printf("markLastUsed: error returned from state machine: %v", err)
|
|
return result.Err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type checkoutAddrArgs struct {
|
|
NodeID tailcfg.NodeID
|
|
Domain string
|
|
ReuseDeadline time.Time
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
// executeCheckoutAddr parses a checkoutAddr raft log entry and applies it.
|
|
func (ipp *ConsensusIPPool) executeCheckoutAddr(bs []byte) tsconsensus.CommandResult {
|
|
var args checkoutAddrArgs
|
|
err := json.Unmarshal(bs, &args)
|
|
if err != nil {
|
|
return tsconsensus.CommandResult{Err: err}
|
|
}
|
|
addr, err := ipp.applyCheckoutAddr(args.NodeID, args.Domain, args.ReuseDeadline, args.UpdatedAt)
|
|
if err != nil {
|
|
return tsconsensus.CommandResult{Err: err}
|
|
}
|
|
resultBs, err := json.Marshal(addr)
|
|
if err != nil {
|
|
return tsconsensus.CommandResult{Err: err}
|
|
}
|
|
return tsconsensus.CommandResult{Result: resultBs}
|
|
}
|
|
|
|
// applyCheckoutAddr finds the IP address for a nid+domain
|
|
// Each nid can use all of the addresses in the pool.
|
|
// updatedAt is the current time, the time at which we are wanting to get a new IP address.
|
|
// reuseDeadline is the time before which addresses are considered to be expired.
|
|
// So if addresses are being reused after they haven't been used for 24 hours say updatedAt would be now
|
|
// and reuseDeadline would be 24 hours ago.
|
|
// It is not safe for concurrent access (it's only called from raft, which will not call concurrently
|
|
// so that's fine).
|
|
func (ipp *ConsensusIPPool) applyCheckoutAddr(nid tailcfg.NodeID, domain string, reuseDeadline, updatedAt time.Time) (netip.Addr, error) {
|
|
ps, ok := ipp.perPeerMap.Load(nid)
|
|
if !ok {
|
|
ps = &consensusPerPeerState{
|
|
addrToDomain: &syncs.Map[netip.Addr, whereWhen]{},
|
|
}
|
|
ipp.perPeerMap.Store(nid, ps)
|
|
}
|
|
if existing, ok := ps.domainToAddr[domain]; ok {
|
|
ww, ok := ps.addrToDomain.Load(existing)
|
|
if ok {
|
|
ww.LastUsed = updatedAt
|
|
ps.addrToDomain.Store(existing, ww)
|
|
return existing, nil
|
|
}
|
|
log.Printf("applyCheckoutAddr: data out of sync, allocating new IP")
|
|
}
|
|
addr, wasInUse, previousDomain, err := ps.unusedIPV4(ipp.IPSet, reuseDeadline)
|
|
if err != nil {
|
|
return netip.Addr{}, err
|
|
}
|
|
mak.Set(&ps.domainToAddr, domain, addr)
|
|
if wasInUse {
|
|
delete(ps.domainToAddr, previousDomain)
|
|
}
|
|
ps.addrToDomain.Store(addr, whereWhen{Domain: domain, LastUsed: updatedAt})
|
|
return addr, nil
|
|
}
|
|
|
|
// Apply is part of the raft.FSM interface. It takes an incoming log entry and applies it to the state.
|
|
func (ipp *ConsensusIPPool) Apply(l *raft.Log) any {
|
|
var c tsconsensus.Command
|
|
if err := json.Unmarshal(l.Data, &c); err != nil {
|
|
panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
|
|
}
|
|
switch c.Name {
|
|
case "checkoutAddr":
|
|
return ipp.executeCheckoutAddr(c.Args)
|
|
case "markLastUsed":
|
|
return ipp.executeMarkLastUsed(c.Args)
|
|
case "readDomainForIP":
|
|
return ipp.executeReadDomainForIP(c.Args)
|
|
default:
|
|
panic(fmt.Sprintf("unrecognized command: %s", c.Name))
|
|
}
|
|
}
|
|
|
|
// commandExecutor is an interface covering the routing parts of consensus
|
|
// used to allow a fake in the tests
|
|
type commandExecutor interface {
|
|
ExecuteCommand(tsconsensus.Command) (tsconsensus.CommandResult, error)
|
|
}
|