tsconsensus,cmd/natc: add 'follower only' bootstrap option

Currently consensus has a bootstrap routine where a tsnet node tries to
join each other node with the cluster tag, and if it is not able to join
any other node it starts its own cluster.

That algorithm is racy, and can result in split brain (more than one
leader/cluster) if all the nodes for a cluster are started at the same
time.

Add a FollowOnly argument to the bootstrap function. If provided this
tsnet node will never lead, it will try (and retry with exponential back
off) to follow any node it can contact.

Add a --follow-only flag to cmd/natc that uses this new tsconsensus
functionality.

Also slightly reorganize some arguments into opts structs.

Updates #14667

Signed-off-by: Fran Bull <fran@tailscale.com>
This commit is contained in:
Fran Bull
2025-08-06 07:43:58 -07:00
committed by franbull
parent d4b7200129
commit d986baa18f
4 changed files with 121 additions and 44 deletions

View File

@@ -149,12 +149,21 @@ func (ipp *ConsensusIPPool) domainLookup(from tailcfg.NodeID, addr netip.Addr) (
return ww, true return ww, true
} }
type ClusterOpts struct {
Tag string
StateDir string
FollowOnly bool
}
// StartConsensus is part of the IPPool interface. It starts the raft background routines that handle consensus. // StartConsensus is part of the IPPool interface. It starts the raft background routines that handle consensus.
func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, clusterTag string, clusterStateDir string) error { func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, opts ClusterOpts) error {
cfg := tsconsensus.DefaultConfig() cfg := tsconsensus.DefaultConfig()
cfg.ServeDebugMonitor = true cfg.ServeDebugMonitor = true
cfg.StateDirPath = clusterStateDir cfg.StateDirPath = opts.StateDir
cns, err := tsconsensus.Start(ctx, ts, ipp, clusterTag, cfg) cns, err := tsconsensus.Start(ctx, ts, ipp, tsconsensus.BootstrapOpts{
Tag: opts.Tag,
FollowOnly: opts.FollowOnly,
}, cfg)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -50,18 +50,19 @@ func main() {
// Parse flags // Parse flags
fs := flag.NewFlagSet("natc", flag.ExitOnError) fs := flag.NewFlagSet("natc", flag.ExitOnError)
var ( var (
debugPort = fs.Int("debug-port", 8893, "Listening port for debug/metrics endpoint") debugPort = fs.Int("debug-port", 8893, "Listening port for debug/metrics endpoint")
hostname = fs.String("hostname", "", "Hostname to register the service under") hostname = fs.String("hostname", "", "Hostname to register the service under")
siteID = fs.Uint("site-id", 1, "an integer site ID to use for the ULA prefix which allows for multiple proxies to act in a HA configuration") siteID = fs.Uint("site-id", 1, "an integer site ID to use for the ULA prefix which allows for multiple proxies to act in a HA configuration")
v4PfxStr = fs.String("v4-pfx", "100.64.1.0/24", "comma-separated list of IPv4 prefixes to advertise") v4PfxStr = fs.String("v4-pfx", "100.64.1.0/24", "comma-separated list of IPv4 prefixes to advertise")
dnsServers = fs.String("dns-servers", "", "comma separated list of upstream DNS to use, including host and port (use system if empty)") dnsServers = fs.String("dns-servers", "", "comma separated list of upstream DNS to use, including host and port (use system if empty)")
verboseTSNet = fs.Bool("verbose-tsnet", false, "enable verbose logging in tsnet") verboseTSNet = fs.Bool("verbose-tsnet", false, "enable verbose logging in tsnet")
printULA = fs.Bool("print-ula", false, "print the ULA prefix and exit") printULA = fs.Bool("print-ula", false, "print the ULA prefix and exit")
ignoreDstPfxStr = fs.String("ignore-destinations", "", "comma-separated list of prefixes to ignore") ignoreDstPfxStr = fs.String("ignore-destinations", "", "comma-separated list of prefixes to ignore")
wgPort = fs.Uint("wg-port", 0, "udp port for wireguard and peer to peer traffic") wgPort = fs.Uint("wg-port", 0, "udp port for wireguard and peer to peer traffic")
clusterTag = fs.String("cluster-tag", "", "optionally run in a consensus cluster with other nodes with this tag") clusterTag = fs.String("cluster-tag", "", "optionally run in a consensus cluster with other nodes with this tag")
server = fs.String("login-server", ipn.DefaultControlURL, "the base URL of control server") server = fs.String("login-server", ipn.DefaultControlURL, "the base URL of control server")
stateDir = fs.String("state-dir", "", "path to directory in which to store app state") stateDir = fs.String("state-dir", "", "path to directory in which to store app state")
clusterFollowOnly = fs.Bool("follow-only", false, "Try to find a leader with the cluster tag or exit.")
) )
ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("TS_NATC")) ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("TS_NATC"))
@@ -163,7 +164,11 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("Creating cluster state dir failed: %v", err) log.Fatalf("Creating cluster state dir failed: %v", err)
} }
err = cipp.StartConsensus(ctx, ts, *clusterTag, clusterStateDir) err = cipp.StartConsensus(ctx, ts, ippool.ClusterOpts{
Tag: *clusterTag,
StateDir: clusterStateDir,
FollowOnly: *clusterFollowOnly,
})
if err != nil { if err != nil {
log.Fatalf("StartConsensus: %v", err) log.Fatalf("StartConsensus: %v", err)
} }

View File

@@ -157,13 +157,18 @@ func (sl StreamLayer) Accept() (net.Conn, error) {
} }
} }
type BootstrapOpts struct {
Tag string
FollowOnly bool
}
// Start returns a pointer to a running Consensus instance. // Start returns a pointer to a running Consensus instance.
// Calling it with a *tsnet.Server will cause that server to join or start a consensus cluster // Calling it with a *tsnet.Server will cause that server to join or start a consensus cluster
// with other nodes on the tailnet tagged with the clusterTag. The *tsnet.Server will run the state // with other nodes on the tailnet tagged with the clusterTag. The *tsnet.Server will run the state
// machine defined by the raft.FSM also provided, and keep it in sync with the other cluster members' // machine defined by the raft.FSM also provided, and keep it in sync with the other cluster members'
// state machines using Raft. // state machines using Raft.
func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag string, cfg Config) (*Consensus, error) { func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, bootstrapOpts BootstrapOpts, cfg Config) (*Consensus, error) {
if clusterTag == "" { if bootstrapOpts.Tag == "" {
return nil, errors.New("cluster tag must be provided") return nil, errors.New("cluster tag must be provided")
} }
@@ -185,7 +190,7 @@ func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag strin
shutdownCtxCancel: shutdownCtxCancel, shutdownCtxCancel: shutdownCtxCancel,
} }
auth := newAuthorization(ts, clusterTag) auth := newAuthorization(ts, bootstrapOpts.Tag)
err := auth.Refresh(shutdownCtx) err := auth.Refresh(shutdownCtx)
if err != nil { if err != nil {
return nil, fmt.Errorf("auth refresh: %w", err) return nil, fmt.Errorf("auth refresh: %w", err)
@@ -211,7 +216,7 @@ func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag strin
// we may already be in a consensus (see comment above before startRaft) but we're going to // we may already be in a consensus (see comment above before startRaft) but we're going to
// try to bootstrap anyway in case this is a fresh start. // try to bootstrap anyway in case this is a fresh start.
err = c.bootstrap(auth.AllowedPeers()) err = c.bootstrap(shutdownCtx, auth, bootstrapOpts)
if err != nil { if err != nil {
if errors.Is(err, raft.ErrCantBootstrap) { if errors.Is(err, raft.ErrCantBootstrap) {
// Raft cluster state can be persisted, if we try to call raft.BootstrapCluster when // Raft cluster state can be persisted, if we try to call raft.BootstrapCluster when
@@ -303,14 +308,59 @@ type Consensus struct {
shutdownCtxCancel context.CancelFunc shutdownCtxCancel context.CancelFunc
} }
func (c *Consensus) bootstrapTryToJoinAnyTarget(targets views.Slice[*ipnstate.PeerStatus]) bool {
log.Printf("Bootstrap: Trying to find cluster: num targets to try: %d", targets.Len())
for _, p := range targets.All() {
if !p.Online {
log.Printf("Bootstrap: Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0])
continue
}
log.Printf("Bootstrap: Trying to find cluster: trying %s", p.TailscaleIPs[0])
err := c.commandClient.join(p.TailscaleIPs[0].String(), joinRequest{
RemoteHost: c.self.hostAddr.String(),
RemoteID: c.self.id,
})
if err != nil {
log.Printf("Bootstrap: Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err)
continue
}
log.Printf("Bootstrap: Trying to find cluster: joined %s", p.TailscaleIPs[0])
return true
}
return false
}
func (c *Consensus) retryFollow(ctx context.Context, auth *authorization) bool {
waitFor := 500 * time.Millisecond
nRetries := 10
attemptCount := 1
for true {
log.Printf("Bootstrap: trying to follow any cluster member: attempt %v", attemptCount)
joined := c.bootstrapTryToJoinAnyTarget(auth.AllowedPeers())
if joined || attemptCount == nRetries {
return joined
}
log.Printf("Bootstrap: Failed to follow. Retrying in %v", waitFor)
time.Sleep(waitFor)
waitFor *= 2
attemptCount++
auth.Refresh(ctx)
}
return false
}
// bootstrap tries to join a raft cluster, or start one. // bootstrap tries to join a raft cluster, or start one.
// //
// We need to do the very first raft cluster configuration, but after that raft manages it. // We need to do the very first raft cluster configuration, but after that raft manages it.
// bootstrap is called at start up, and we may not currently be aware of what the cluster config might be, // bootstrap is called at start up, and we may not currently be aware of what the cluster config might be,
// our node may already be in it. Try to join the raft cluster of all the other nodes we know about, and // our node may already be in it. Try to join the raft cluster of all the other nodes we know about, and
// if unsuccessful, assume we are the first and try to start our own. // if unsuccessful, assume we are the first and try to start our own. If the FollowOnly option is set, only try
// to join, never start our own.
// //
// It's possible for bootstrap to return an error, or start a errant breakaway cluster. // It's possible for bootstrap to start an errant breakaway cluster if for example all nodes are having a fresh
// start, they're racing bootstrap and multiple nodes were unable to join a peer and so start their own new cluster.
// To avoid this operators should either ensure bootstrap is called for a single node first and allow it to become
// leader before starting the other nodes. Or start all but one node with the FollowOnly option.
// //
// We have a list of expected cluster members already from control (the members of the tailnet with the tag) // We have a list of expected cluster members already from control (the members of the tailnet with the tag)
// so we could do the initial configuration with all servers specified. // so we could do the initial configuration with all servers specified.
@@ -318,27 +368,20 @@ type Consensus struct {
// - We want to handle machines joining after start anyway. // - We want to handle machines joining after start anyway.
// - Not all tagged nodes tailscale believes are active are necessarily actually responsive right now, // - Not all tagged nodes tailscale believes are active are necessarily actually responsive right now,
// so let each node opt in when able. // so let each node opt in when able.
func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus]) error { func (c *Consensus) bootstrap(ctx context.Context, auth *authorization, opts BootstrapOpts) error {
log.Printf("Trying to find cluster: num targets to try: %d", targets.Len()) if opts.FollowOnly {
for _, p := range targets.All() { joined := c.retryFollow(ctx, auth)
if !p.Online { if !joined {
log.Printf("Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0]) return errors.New("unable to join cluster")
continue
} }
log.Printf("Trying to find cluster: trying %s", p.TailscaleIPs[0])
err := c.commandClient.join(p.TailscaleIPs[0].String(), joinRequest{
RemoteHost: c.self.hostAddr.String(),
RemoteID: c.self.id,
})
if err != nil {
log.Printf("Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err)
continue
}
log.Printf("Trying to find cluster: joined %s", p.TailscaleIPs[0])
return nil return nil
} }
log.Printf("Trying to find cluster: unsuccessful, starting as leader: %s", c.self.hostAddr.String()) joined := c.bootstrapTryToJoinAnyTarget(auth.AllowedPeers())
if joined {
return nil
}
log.Printf("Bootstrap: Trying to find cluster: unsuccessful, starting as leader: %s", c.self.hostAddr.String())
f := c.raft.BootstrapCluster( f := c.raft.BootstrapCluster(
raft.Configuration{ raft.Configuration{
Servers: []raft.Server{ Servers: []raft.Server{

View File

@@ -262,7 +262,7 @@ func TestStart(t *testing.T) {
waitForNodesToBeTaggedInStatus(t, ctx, one, []key.NodePublic{k}, clusterTag) waitForNodesToBeTaggedInStatus(t, ctx, one, []key.NodePublic{k}, clusterTag)
sm := &fsm{} sm := &fsm{}
r, err := Start(ctx, one, sm, clusterTag, warnLogConfig()) r, err := Start(ctx, one, sm, BootstrapOpts{Tag: clusterTag}, warnLogConfig())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -334,7 +334,7 @@ func createConsensusCluster(t testing.TB, ctx context.Context, clusterTag string
t.Helper() t.Helper()
participants[0].sm = &fsm{} participants[0].sm = &fsm{}
myCfg := addIDedLogger("0", cfg) myCfg := addIDedLogger("0", cfg)
first, err := Start(ctx, participants[0].ts, participants[0].sm, clusterTag, myCfg) first, err := Start(ctx, participants[0].ts, participants[0].sm, BootstrapOpts{Tag: clusterTag}, myCfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -347,7 +347,7 @@ func createConsensusCluster(t testing.TB, ctx context.Context, clusterTag string
for i := 1; i < len(participants); i++ { for i := 1; i < len(participants); i++ {
participants[i].sm = &fsm{} participants[i].sm = &fsm{}
myCfg := addIDedLogger(fmt.Sprintf("%d", i), cfg) myCfg := addIDedLogger(fmt.Sprintf("%d", i), cfg)
c, err := Start(ctx, participants[i].ts, participants[i].sm, clusterTag, myCfg) c, err := Start(ctx, participants[i].ts, participants[i].sm, BootstrapOpts{Tag: clusterTag}, myCfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -530,7 +530,7 @@ func TestFollowerFailover(t *testing.T) {
// follower comes back // follower comes back
smThreeAgain := &fsm{} smThreeAgain := &fsm{}
cfg = addIDedLogger("2 after restarting", warnLogConfig()) cfg = addIDedLogger("2 after restarting", warnLogConfig())
rThreeAgain, err := Start(ctx, ps[2].ts, smThreeAgain, clusterTag, cfg) rThreeAgain, err := Start(ctx, ps[2].ts, smThreeAgain, BootstrapOpts{Tag: clusterTag}, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -565,7 +565,7 @@ func TestRejoin(t *testing.T) {
tagNodes(t, control, []key.NodePublic{keyJoiner}, clusterTag) tagNodes(t, control, []key.NodePublic{keyJoiner}, clusterTag)
waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{keyJoiner}, clusterTag) waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{keyJoiner}, clusterTag)
smJoiner := &fsm{} smJoiner := &fsm{}
cJoiner, err := Start(ctx, tsJoiner, smJoiner, clusterTag, cfg) cJoiner, err := Start(ctx, tsJoiner, smJoiner, BootstrapOpts{Tag: clusterTag}, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -744,3 +744,23 @@ func TestOnlyTaggedPeersCanJoin(t *testing.T) {
t.Fatalf("join req when not tagged, expected body: %s, got: %s", expected, sBody) t.Fatalf("join req when not tagged, expected body: %s, got: %s", expected, sBody)
} }
} }
func TestFollowOnly(t *testing.T) {
testConfig(t)
ctx := context.Background()
clusterTag := "tag:whatever"
ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3)
cfg := warnLogConfig()
// start the leader
_, err := Start(ctx, ps[0].ts, ps[0].sm, BootstrapOpts{Tag: clusterTag}, cfg)
if err != nil {
t.Fatal(err)
}
// start the follower with FollowOnly
_, err = Start(ctx, ps[1].ts, ps[1].sm, BootstrapOpts{Tag: clusterTag, FollowOnly: true}, cfg)
if err != nil {
t.Fatal(err)
}
}