diff --git a/cmd/natc/ippool/consensusippool.go b/cmd/natc/ippool/consensusippool.go index 3bc21bd03..46ec9fccf 100644 --- a/cmd/natc/ippool/consensusippool.go +++ b/cmd/natc/ippool/consensusippool.go @@ -149,12 +149,21 @@ func (ipp *ConsensusIPPool) domainLookup(from tailcfg.NodeID, addr netip.Addr) ( return ww, true } +type ClusterOpts struct { + Tag string + StateDir string + Follow string +} + // StartConsensus is part of the IPPool interface. It starts the raft background routines that handle consensus. -func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, clusterTag string, clusterStateDir string) error { +func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, opts ClusterOpts) error { cfg := tsconsensus.DefaultConfig() cfg.ServeDebugMonitor = true - cfg.StateDirPath = clusterStateDir - cns, err := tsconsensus.Start(ctx, ts, ipp, clusterTag, cfg) + cfg.StateDirPath = opts.StateDir + cns, err := tsconsensus.Start(ctx, ts, ipp, tsconsensus.BootstrapOpts{ + Tag: opts.Tag, + FollowAddr: opts.Follow, + }, cfg) if err != nil { return err } diff --git a/cmd/natc/natc.go b/cmd/natc/natc.go index fdbce3da1..1b20203c3 100644 --- a/cmd/natc/natc.go +++ b/cmd/natc/natc.go @@ -62,6 +62,7 @@ func main() { clusterTag = fs.String("cluster-tag", "", "optionally run in a consensus cluster with other nodes with this tag") server = fs.String("login-server", ipn.DefaultControlURL, "the base URL of control server") stateDir = fs.String("state-dir", "", "path to directory in which to store app state") + clusterFollow = fs.String("follow", "", "Try to follow a leader at the specified location or exit.") ) ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("TS_NATC")) @@ -163,7 +164,11 @@ func main() { if err != nil { log.Fatalf("Creating cluster state dir failed: %v", err) } - err = cipp.StartConsensus(ctx, ts, *clusterTag, clusterStateDir) + err = cipp.StartConsensus(ctx, ts, ippool.ClusterOpts{ + Tag: *clusterTag, + StateDir: clusterStateDir, + Follow: *clusterFollow, + }) if err != nil { log.Fatalf("StartConsensus: %v", err) } diff --git a/tsconsensus/tsconsensus.go b/tsconsensus/tsconsensus.go index b6bf37310..ef8d8076a 100644 --- a/tsconsensus/tsconsensus.go +++ b/tsconsensus/tsconsensus.go @@ -157,13 +157,18 @@ func (sl StreamLayer) Accept() (net.Conn, error) { } } +type BootstrapOpts struct { + Tag string + FollowAddr string +} + // Start returns a pointer to a running Consensus instance. // Calling it with a *tsnet.Server will cause that server to join or start a consensus cluster // with other nodes on the tailnet tagged with the clusterTag. The *tsnet.Server will run the state // machine defined by the raft.FSM also provided, and keep it in sync with the other cluster members' // state machines using Raft. -func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag string, cfg Config) (*Consensus, error) { - if clusterTag == "" { +func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, bootstrapOpts BootstrapOpts, cfg Config) (*Consensus, error) { + if bootstrapOpts.Tag == "" { return nil, errors.New("cluster tag must be provided") } @@ -185,7 +190,7 @@ func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag strin shutdownCtxCancel: shutdownCtxCancel, } - auth := newAuthorization(ts, clusterTag) + auth := newAuthorization(ts, bootstrapOpts.Tag) err := auth.Refresh(shutdownCtx) if err != nil { return nil, fmt.Errorf("auth refresh: %w", err) @@ -209,7 +214,10 @@ func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag strin } c.raft = r - c.bootstrap(auth.AllowedPeers()) + err = c.bootstrap(auth.AllowedPeers(), bootstrapOpts) + if err != nil { + return nil, err + } if cfg.ServeDebugMonitor { srv, err = serveMonitor(&c, ts, netip.AddrPortFrom(c.self.hostAddr, cfg.MonitorPort).String()) @@ -304,11 +312,31 @@ type Consensus struct { // - We want to handle machines joining after start anyway. // - Not all tagged nodes tailscale believes are active are necessarily actually responsive right now, // so let each node opt in when able. -func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus]) error { +func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus], opts BootstrapOpts) error { + if opts.FollowAddr != "" { + log.Printf("Bootstrap: Trying to join provided addr: %s", opts.FollowAddr) + retryWaitTime := 500 * time.Millisecond + attemptCount := 0 + var err error + for true { + err = c.commandClient.join(opts.FollowAddr, joinRequest{ + RemoteHost: c.self.hostAddr.String(), + RemoteID: c.self.id, + }) + if err == nil || attemptCount >= 10 { + break + } + log.Printf("Bootstrap: Failed to follow. Retrying in %v", retryWaitTime) + time.Sleep(retryWaitTime) + retryWaitTime *= 2 + attemptCount++ + } + return err + } log.Printf("Trying to find cluster: num targets to try: %d", targets.Len()) for _, p := range targets.All() { if !p.Online { - log.Printf("Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0]) + log.Printf("Bootstrap: Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0]) continue } log.Printf("Trying to find cluster: trying %s", p.TailscaleIPs[0]) @@ -317,10 +345,10 @@ func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus]) error { RemoteID: c.self.id, }) if err != nil { - log.Printf("Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err) + log.Printf("Bootstrap: Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err) continue } - log.Printf("Trying to find cluster: joined %s", p.TailscaleIPs[0]) + log.Printf("Bootstrap: Trying to find cluster: joined %s", p.TailscaleIPs[0]) return nil } diff --git a/tsconsensus/tsconsensus_test.go b/tsconsensus/tsconsensus_test.go index bfb6b3e06..0c5d90744 100644 --- a/tsconsensus/tsconsensus_test.go +++ b/tsconsensus/tsconsensus_test.go @@ -262,7 +262,7 @@ func TestStart(t *testing.T) { waitForNodesToBeTaggedInStatus(t, ctx, one, []key.NodePublic{k}, clusterTag) sm := &fsm{} - r, err := Start(ctx, one, sm, clusterTag, warnLogConfig()) + r, err := Start(ctx, one, sm, BootstrapOpts{Tag: clusterTag}, warnLogConfig()) if err != nil { t.Fatal(err) } @@ -334,7 +334,7 @@ func createConsensusCluster(t testing.TB, ctx context.Context, clusterTag string t.Helper() participants[0].sm = &fsm{} myCfg := addIDedLogger("0", cfg) - first, err := Start(ctx, participants[0].ts, participants[0].sm, clusterTag, myCfg) + first, err := Start(ctx, participants[0].ts, participants[0].sm, BootstrapOpts{Tag: clusterTag}, myCfg) if err != nil { t.Fatal(err) } @@ -347,7 +347,7 @@ func createConsensusCluster(t testing.TB, ctx context.Context, clusterTag string for i := 1; i < len(participants); i++ { participants[i].sm = &fsm{} myCfg := addIDedLogger(fmt.Sprintf("%d", i), cfg) - c, err := Start(ctx, participants[i].ts, participants[i].sm, clusterTag, myCfg) + c, err := Start(ctx, participants[i].ts, participants[i].sm, BootstrapOpts{Tag: clusterTag}, myCfg) if err != nil { t.Fatal(err) } @@ -530,7 +530,7 @@ func TestFollowerFailover(t *testing.T) { // follower comes back smThreeAgain := &fsm{} cfg = addIDedLogger("2 after restarting", warnLogConfig()) - rThreeAgain, err := Start(ctx, ps[2].ts, smThreeAgain, clusterTag, cfg) + rThreeAgain, err := Start(ctx, ps[2].ts, smThreeAgain, BootstrapOpts{Tag: clusterTag}, cfg) if err != nil { t.Fatal(err) } @@ -565,7 +565,7 @@ func TestRejoin(t *testing.T) { tagNodes(t, control, []key.NodePublic{keyJoiner}, clusterTag) waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{keyJoiner}, clusterTag) smJoiner := &fsm{} - cJoiner, err := Start(ctx, tsJoiner, smJoiner, clusterTag, cfg) + cJoiner, err := Start(ctx, tsJoiner, smJoiner, BootstrapOpts{Tag: clusterTag}, cfg) if err != nil { t.Fatal(err) } @@ -744,3 +744,49 @@ func TestOnlyTaggedPeersCanJoin(t *testing.T) { t.Fatalf("join req when not tagged, expected body: %s, got: %s", expected, sBody) } } + +func TestFollowAddr(t *testing.T) { + testConfig(t) + ctx := context.Background() + clusterTag := "tag:whatever" + ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3) + cfg := warnLogConfig() + + // get the address for the node that's going to be the leader + lc, err := ps[0].ts.LocalClient() + if err != nil { + t.Fatal(err) + } + status, err := lc.Status(ctx) + if err != nil { + t.Fatal(err) + } + firstV4 := status.TailscaleIPs[0] + + // start the second and third participants of the cluster with FollowAddr, Start won't + // return until they have joined the leader. + secondDone := make(chan error, 1) + thirdDone := make(chan error, 1) + startParticipantConsensus := func(p *participant, done chan error) { + _, err := Start(ctx, p.ts, p.sm, BootstrapOpts{Tag: clusterTag, FollowAddr: firstV4.String()}, cfg) + done <- err + } + go startParticipantConsensus(ps[1], secondDone) + go startParticipantConsensus(ps[2], thirdDone) + + // start the leader + _, err = Start(ctx, ps[0].ts, ps[0].sm, BootstrapOpts{Tag: clusterTag}, cfg) + if err != nil { + t.Fatal(err) + } + + // verify second and third start without error + err = <-secondDone + if err != nil { + t.Fatal("error starting second", err) + } + err = <-thirdDone + if err != nil { + t.Fatal("error starting third", err) + } +}