diff --git a/cmd/natc/ippool/consensusippool.go b/cmd/natc/ippool/consensusippool.go index 3bc21bd03..821f12fae 100644 --- a/cmd/natc/ippool/consensusippool.go +++ b/cmd/natc/ippool/consensusippool.go @@ -149,12 +149,21 @@ func (ipp *ConsensusIPPool) domainLookup(from tailcfg.NodeID, addr netip.Addr) ( return ww, true } +type ClusterOpts struct { + Tag string + StateDir string + FollowOnly bool +} + // StartConsensus is part of the IPPool interface. It starts the raft background routines that handle consensus. -func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, clusterTag string, clusterStateDir string) error { +func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, opts ClusterOpts) error { cfg := tsconsensus.DefaultConfig() cfg.ServeDebugMonitor = true - cfg.StateDirPath = clusterStateDir - cns, err := tsconsensus.Start(ctx, ts, ipp, clusterTag, cfg) + cfg.StateDirPath = opts.StateDir + cns, err := tsconsensus.Start(ctx, ts, ipp, tsconsensus.BootstrapOpts{ + Tag: opts.Tag, + FollowOnly: opts.FollowOnly, + }, cfg) if err != nil { return err } diff --git a/cmd/natc/natc.go b/cmd/natc/natc.go index fdbce3da1..2007f0a24 100644 --- a/cmd/natc/natc.go +++ b/cmd/natc/natc.go @@ -50,18 +50,19 @@ func main() { // Parse flags fs := flag.NewFlagSet("natc", flag.ExitOnError) var ( - debugPort = fs.Int("debug-port", 8893, "Listening port for debug/metrics endpoint") - hostname = fs.String("hostname", "", "Hostname to register the service under") - siteID = fs.Uint("site-id", 1, "an integer site ID to use for the ULA prefix which allows for multiple proxies to act in a HA configuration") - v4PfxStr = fs.String("v4-pfx", "100.64.1.0/24", "comma-separated list of IPv4 prefixes to advertise") - dnsServers = fs.String("dns-servers", "", "comma separated list of upstream DNS to use, including host and port (use system if empty)") - verboseTSNet = fs.Bool("verbose-tsnet", false, "enable verbose logging in tsnet") - printULA = fs.Bool("print-ula", false, "print the ULA prefix and exit") - ignoreDstPfxStr = fs.String("ignore-destinations", "", "comma-separated list of prefixes to ignore") - wgPort = fs.Uint("wg-port", 0, "udp port for wireguard and peer to peer traffic") - clusterTag = fs.String("cluster-tag", "", "optionally run in a consensus cluster with other nodes with this tag") - server = fs.String("login-server", ipn.DefaultControlURL, "the base URL of control server") - stateDir = fs.String("state-dir", "", "path to directory in which to store app state") + debugPort = fs.Int("debug-port", 8893, "Listening port for debug/metrics endpoint") + hostname = fs.String("hostname", "", "Hostname to register the service under") + siteID = fs.Uint("site-id", 1, "an integer site ID to use for the ULA prefix which allows for multiple proxies to act in a HA configuration") + v4PfxStr = fs.String("v4-pfx", "100.64.1.0/24", "comma-separated list of IPv4 prefixes to advertise") + dnsServers = fs.String("dns-servers", "", "comma separated list of upstream DNS to use, including host and port (use system if empty)") + verboseTSNet = fs.Bool("verbose-tsnet", false, "enable verbose logging in tsnet") + printULA = fs.Bool("print-ula", false, "print the ULA prefix and exit") + ignoreDstPfxStr = fs.String("ignore-destinations", "", "comma-separated list of prefixes to ignore") + wgPort = fs.Uint("wg-port", 0, "udp port for wireguard and peer to peer traffic") + clusterTag = fs.String("cluster-tag", "", "optionally run in a consensus cluster with other nodes with this tag") + server = fs.String("login-server", ipn.DefaultControlURL, "the base URL of control server") + stateDir = fs.String("state-dir", "", "path to directory in which to store app state") + clusterFollowOnly = fs.Bool("follow-only", false, "Try to find a leader with the cluster tag or exit.") ) ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("TS_NATC")) @@ -163,7 +164,11 @@ func main() { if err != nil { log.Fatalf("Creating cluster state dir failed: %v", err) } - err = cipp.StartConsensus(ctx, ts, *clusterTag, clusterStateDir) + err = cipp.StartConsensus(ctx, ts, ippool.ClusterOpts{ + Tag: *clusterTag, + StateDir: clusterStateDir, + FollowOnly: *clusterFollowOnly, + }) if err != nil { log.Fatalf("StartConsensus: %v", err) } diff --git a/tsconsensus/tsconsensus.go b/tsconsensus/tsconsensus.go index 53a2c3f54..11b039d57 100644 --- a/tsconsensus/tsconsensus.go +++ b/tsconsensus/tsconsensus.go @@ -157,13 +157,18 @@ func (sl StreamLayer) Accept() (net.Conn, error) { } } +type BootstrapOpts struct { + Tag string + FollowOnly bool +} + // Start returns a pointer to a running Consensus instance. // Calling it with a *tsnet.Server will cause that server to join or start a consensus cluster // with other nodes on the tailnet tagged with the clusterTag. The *tsnet.Server will run the state // machine defined by the raft.FSM also provided, and keep it in sync with the other cluster members' // state machines using Raft. -func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag string, cfg Config) (*Consensus, error) { - if clusterTag == "" { +func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, bootstrapOpts BootstrapOpts, cfg Config) (*Consensus, error) { + if bootstrapOpts.Tag == "" { return nil, errors.New("cluster tag must be provided") } @@ -185,7 +190,7 @@ func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag strin shutdownCtxCancel: shutdownCtxCancel, } - auth := newAuthorization(ts, clusterTag) + auth := newAuthorization(ts, bootstrapOpts.Tag) err := auth.Refresh(shutdownCtx) if err != nil { return nil, fmt.Errorf("auth refresh: %w", err) @@ -211,7 +216,7 @@ func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag strin // we may already be in a consensus (see comment above before startRaft) but we're going to // try to bootstrap anyway in case this is a fresh start. - err = c.bootstrap(auth.AllowedPeers()) + err = c.bootstrap(shutdownCtx, auth, bootstrapOpts) if err != nil { if errors.Is(err, raft.ErrCantBootstrap) { // Raft cluster state can be persisted, if we try to call raft.BootstrapCluster when @@ -303,14 +308,59 @@ type Consensus struct { shutdownCtxCancel context.CancelFunc } +func (c *Consensus) bootstrapTryToJoinAnyTarget(targets views.Slice[*ipnstate.PeerStatus]) bool { + log.Printf("Bootstrap: Trying to find cluster: num targets to try: %d", targets.Len()) + for _, p := range targets.All() { + if !p.Online { + log.Printf("Bootstrap: Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0]) + continue + } + log.Printf("Bootstrap: Trying to find cluster: trying %s", p.TailscaleIPs[0]) + err := c.commandClient.join(p.TailscaleIPs[0].String(), joinRequest{ + RemoteHost: c.self.hostAddr.String(), + RemoteID: c.self.id, + }) + if err != nil { + log.Printf("Bootstrap: Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err) + continue + } + log.Printf("Bootstrap: Trying to find cluster: joined %s", p.TailscaleIPs[0]) + return true + } + return false +} + +func (c *Consensus) retryFollow(ctx context.Context, auth *authorization) bool { + waitFor := 500 * time.Millisecond + nRetries := 10 + attemptCount := 1 + for true { + log.Printf("Bootstrap: trying to follow any cluster member: attempt %v", attemptCount) + joined := c.bootstrapTryToJoinAnyTarget(auth.AllowedPeers()) + if joined || attemptCount == nRetries { + return joined + } + log.Printf("Bootstrap: Failed to follow. Retrying in %v", waitFor) + time.Sleep(waitFor) + waitFor *= 2 + attemptCount++ + auth.Refresh(ctx) + } + return false +} + // bootstrap tries to join a raft cluster, or start one. // // We need to do the very first raft cluster configuration, but after that raft manages it. // bootstrap is called at start up, and we may not currently be aware of what the cluster config might be, // our node may already be in it. Try to join the raft cluster of all the other nodes we know about, and -// if unsuccessful, assume we are the first and try to start our own. +// if unsuccessful, assume we are the first and try to start our own. If the FollowOnly option is set, only try +// to join, never start our own. // -// It's possible for bootstrap to return an error, or start a errant breakaway cluster. +// It's possible for bootstrap to start an errant breakaway cluster if for example all nodes are having a fresh +// start, they're racing bootstrap and multiple nodes were unable to join a peer and so start their own new cluster. +// To avoid this operators should either ensure bootstrap is called for a single node first and allow it to become +// leader before starting the other nodes. Or start all but one node with the FollowOnly option. // // We have a list of expected cluster members already from control (the members of the tailnet with the tag) // so we could do the initial configuration with all servers specified. @@ -318,27 +368,20 @@ type Consensus struct { // - We want to handle machines joining after start anyway. // - Not all tagged nodes tailscale believes are active are necessarily actually responsive right now, // so let each node opt in when able. -func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus]) error { - log.Printf("Trying to find cluster: num targets to try: %d", targets.Len()) - for _, p := range targets.All() { - if !p.Online { - log.Printf("Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0]) - continue +func (c *Consensus) bootstrap(ctx context.Context, auth *authorization, opts BootstrapOpts) error { + if opts.FollowOnly { + joined := c.retryFollow(ctx, auth) + if !joined { + return errors.New("unable to join cluster") } - log.Printf("Trying to find cluster: trying %s", p.TailscaleIPs[0]) - err := c.commandClient.join(p.TailscaleIPs[0].String(), joinRequest{ - RemoteHost: c.self.hostAddr.String(), - RemoteID: c.self.id, - }) - if err != nil { - log.Printf("Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err) - continue - } - log.Printf("Trying to find cluster: joined %s", p.TailscaleIPs[0]) return nil } - log.Printf("Trying to find cluster: unsuccessful, starting as leader: %s", c.self.hostAddr.String()) + joined := c.bootstrapTryToJoinAnyTarget(auth.AllowedPeers()) + if joined { + return nil + } + log.Printf("Bootstrap: Trying to find cluster: unsuccessful, starting as leader: %s", c.self.hostAddr.String()) f := c.raft.BootstrapCluster( raft.Configuration{ Servers: []raft.Server{ diff --git a/tsconsensus/tsconsensus_test.go b/tsconsensus/tsconsensus_test.go index bfb6b3e06..3b51a093f 100644 --- a/tsconsensus/tsconsensus_test.go +++ b/tsconsensus/tsconsensus_test.go @@ -262,7 +262,7 @@ func TestStart(t *testing.T) { waitForNodesToBeTaggedInStatus(t, ctx, one, []key.NodePublic{k}, clusterTag) sm := &fsm{} - r, err := Start(ctx, one, sm, clusterTag, warnLogConfig()) + r, err := Start(ctx, one, sm, BootstrapOpts{Tag: clusterTag}, warnLogConfig()) if err != nil { t.Fatal(err) } @@ -334,7 +334,7 @@ func createConsensusCluster(t testing.TB, ctx context.Context, clusterTag string t.Helper() participants[0].sm = &fsm{} myCfg := addIDedLogger("0", cfg) - first, err := Start(ctx, participants[0].ts, participants[0].sm, clusterTag, myCfg) + first, err := Start(ctx, participants[0].ts, participants[0].sm, BootstrapOpts{Tag: clusterTag}, myCfg) if err != nil { t.Fatal(err) } @@ -347,7 +347,7 @@ func createConsensusCluster(t testing.TB, ctx context.Context, clusterTag string for i := 1; i < len(participants); i++ { participants[i].sm = &fsm{} myCfg := addIDedLogger(fmt.Sprintf("%d", i), cfg) - c, err := Start(ctx, participants[i].ts, participants[i].sm, clusterTag, myCfg) + c, err := Start(ctx, participants[i].ts, participants[i].sm, BootstrapOpts{Tag: clusterTag}, myCfg) if err != nil { t.Fatal(err) } @@ -530,7 +530,7 @@ func TestFollowerFailover(t *testing.T) { // follower comes back smThreeAgain := &fsm{} cfg = addIDedLogger("2 after restarting", warnLogConfig()) - rThreeAgain, err := Start(ctx, ps[2].ts, smThreeAgain, clusterTag, cfg) + rThreeAgain, err := Start(ctx, ps[2].ts, smThreeAgain, BootstrapOpts{Tag: clusterTag}, cfg) if err != nil { t.Fatal(err) } @@ -565,7 +565,7 @@ func TestRejoin(t *testing.T) { tagNodes(t, control, []key.NodePublic{keyJoiner}, clusterTag) waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{keyJoiner}, clusterTag) smJoiner := &fsm{} - cJoiner, err := Start(ctx, tsJoiner, smJoiner, clusterTag, cfg) + cJoiner, err := Start(ctx, tsJoiner, smJoiner, BootstrapOpts{Tag: clusterTag}, cfg) if err != nil { t.Fatal(err) } @@ -744,3 +744,23 @@ func TestOnlyTaggedPeersCanJoin(t *testing.T) { t.Fatalf("join req when not tagged, expected body: %s, got: %s", expected, sBody) } } + +func TestFollowOnly(t *testing.T) { + testConfig(t) + ctx := context.Background() + clusterTag := "tag:whatever" + ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3) + cfg := warnLogConfig() + + // start the leader + _, err := Start(ctx, ps[0].ts, ps[0].sm, BootstrapOpts{Tag: clusterTag}, cfg) + if err != nil { + t.Fatal(err) + } + + // start the follower with FollowOnly + _, err = Start(ctx, ps[1].ts, ps[1].sm, BootstrapOpts{Tag: clusterTag, FollowOnly: true}, cfg) + if err != nil { + t.Fatal(err) + } +}