mirror of
https://github.com/tailscale/tailscale.git
synced 2025-07-29 15:23:45 +00:00
cmd/natc,tsconsensus: add 'follower only' bootstrap option
Currently consensus has a bootstrap routine where a tsnet node tries to join each other node with the cluster tag, and if it is not able to join any other node it starts its own cluster. That algorithm is racy, and can result in split brain (more than one leader/cluster) if all the nodes for a cluster are started at the same time. Add a FollowAddr argument to the bootstrap function. If provided this tsnet node will never lead, it will try (and retry with exponential back off) to follow the node at the provided address. Add a --follow flag to cmd/natc that uses this new tsconsensus functionality. Also slightly reorganize some arguments into opts structs. Updates #14667 Signed-off-by: Fran Bull <fran@tailscale.com>
This commit is contained in:
parent
d0cafc0a67
commit
bd3128752c
@ -149,12 +149,21 @@ func (ipp *ConsensusIPPool) domainLookup(from tailcfg.NodeID, addr netip.Addr) (
|
|||||||
return ww, true
|
return ww, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ClusterOpts struct {
|
||||||
|
Tag string
|
||||||
|
StateDir string
|
||||||
|
Follow string
|
||||||
|
}
|
||||||
|
|
||||||
// StartConsensus is part of the IPPool interface. It starts the raft background routines that handle consensus.
|
// StartConsensus is part of the IPPool interface. It starts the raft background routines that handle consensus.
|
||||||
func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, clusterTag string, clusterStateDir string) error {
|
func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server, opts ClusterOpts) error {
|
||||||
cfg := tsconsensus.DefaultConfig()
|
cfg := tsconsensus.DefaultConfig()
|
||||||
cfg.ServeDebugMonitor = true
|
cfg.ServeDebugMonitor = true
|
||||||
cfg.StateDirPath = clusterStateDir
|
cfg.StateDirPath = opts.StateDir
|
||||||
cns, err := tsconsensus.Start(ctx, ts, ipp, clusterTag, cfg)
|
cns, err := tsconsensus.Start(ctx, ts, ipp, tsconsensus.BootstrapOpts{
|
||||||
|
Tag: opts.Tag,
|
||||||
|
FollowAddr: opts.Follow,
|
||||||
|
}, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -62,6 +62,7 @@ func main() {
|
|||||||
clusterTag = fs.String("cluster-tag", "", "optionally run in a consensus cluster with other nodes with this tag")
|
clusterTag = fs.String("cluster-tag", "", "optionally run in a consensus cluster with other nodes with this tag")
|
||||||
server = fs.String("login-server", ipn.DefaultControlURL, "the base URL of control server")
|
server = fs.String("login-server", ipn.DefaultControlURL, "the base URL of control server")
|
||||||
stateDir = fs.String("state-dir", "", "path to directory in which to store app state")
|
stateDir = fs.String("state-dir", "", "path to directory in which to store app state")
|
||||||
|
clusterFollow = fs.String("follow", "", "Try to follow a leader at the specified location or exit.")
|
||||||
)
|
)
|
||||||
ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("TS_NATC"))
|
ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("TS_NATC"))
|
||||||
|
|
||||||
@ -163,7 +164,11 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Creating cluster state dir failed: %v", err)
|
log.Fatalf("Creating cluster state dir failed: %v", err)
|
||||||
}
|
}
|
||||||
err = cipp.StartConsensus(ctx, ts, *clusterTag, clusterStateDir)
|
err = cipp.StartConsensus(ctx, ts, ippool.ClusterOpts{
|
||||||
|
Tag: *clusterTag,
|
||||||
|
StateDir: clusterStateDir,
|
||||||
|
Follow: *clusterFollow,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("StartConsensus: %v", err)
|
log.Fatalf("StartConsensus: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -157,13 +157,18 @@ func (sl StreamLayer) Accept() (net.Conn, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type BootstrapOpts struct {
|
||||||
|
Tag string
|
||||||
|
FollowAddr string
|
||||||
|
}
|
||||||
|
|
||||||
// Start returns a pointer to a running Consensus instance.
|
// Start returns a pointer to a running Consensus instance.
|
||||||
// Calling it with a *tsnet.Server will cause that server to join or start a consensus cluster
|
// Calling it with a *tsnet.Server will cause that server to join or start a consensus cluster
|
||||||
// with other nodes on the tailnet tagged with the clusterTag. The *tsnet.Server will run the state
|
// with other nodes on the tailnet tagged with the clusterTag. The *tsnet.Server will run the state
|
||||||
// machine defined by the raft.FSM also provided, and keep it in sync with the other cluster members'
|
// machine defined by the raft.FSM also provided, and keep it in sync with the other cluster members'
|
||||||
// state machines using Raft.
|
// state machines using Raft.
|
||||||
func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag string, cfg Config) (*Consensus, error) {
|
func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, bootstrapOpts BootstrapOpts, cfg Config) (*Consensus, error) {
|
||||||
if clusterTag == "" {
|
if bootstrapOpts.Tag == "" {
|
||||||
return nil, errors.New("cluster tag must be provided")
|
return nil, errors.New("cluster tag must be provided")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -185,7 +190,7 @@ func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag strin
|
|||||||
shutdownCtxCancel: shutdownCtxCancel,
|
shutdownCtxCancel: shutdownCtxCancel,
|
||||||
}
|
}
|
||||||
|
|
||||||
auth := newAuthorization(ts, clusterTag)
|
auth := newAuthorization(ts, bootstrapOpts.Tag)
|
||||||
err := auth.Refresh(shutdownCtx)
|
err := auth.Refresh(shutdownCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("auth refresh: %w", err)
|
return nil, fmt.Errorf("auth refresh: %w", err)
|
||||||
@ -209,7 +214,10 @@ func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag strin
|
|||||||
}
|
}
|
||||||
c.raft = r
|
c.raft = r
|
||||||
|
|
||||||
c.bootstrap(auth.AllowedPeers())
|
err = c.bootstrap(auth.AllowedPeers(), bootstrapOpts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if cfg.ServeDebugMonitor {
|
if cfg.ServeDebugMonitor {
|
||||||
srv, err = serveMonitor(&c, ts, netip.AddrPortFrom(c.self.hostAddr, cfg.MonitorPort).String())
|
srv, err = serveMonitor(&c, ts, netip.AddrPortFrom(c.self.hostAddr, cfg.MonitorPort).String())
|
||||||
@ -304,11 +312,31 @@ type Consensus struct {
|
|||||||
// - We want to handle machines joining after start anyway.
|
// - We want to handle machines joining after start anyway.
|
||||||
// - Not all tagged nodes tailscale believes are active are necessarily actually responsive right now,
|
// - Not all tagged nodes tailscale believes are active are necessarily actually responsive right now,
|
||||||
// so let each node opt in when able.
|
// so let each node opt in when able.
|
||||||
func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus]) error {
|
func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus], opts BootstrapOpts) error {
|
||||||
|
if opts.FollowAddr != "" {
|
||||||
|
log.Printf("Bootstrap: Trying to join provided addr: %s", opts.FollowAddr)
|
||||||
|
retryWaitTime := 500 * time.Millisecond
|
||||||
|
attemptCount := 0
|
||||||
|
var err error
|
||||||
|
for true {
|
||||||
|
err = c.commandClient.join(opts.FollowAddr, joinRequest{
|
||||||
|
RemoteHost: c.self.hostAddr.String(),
|
||||||
|
RemoteID: c.self.id,
|
||||||
|
})
|
||||||
|
if err == nil || attemptCount >= 10 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Printf("Bootstrap: Failed to follow. Retrying in %v", retryWaitTime)
|
||||||
|
time.Sleep(retryWaitTime)
|
||||||
|
retryWaitTime *= 2
|
||||||
|
attemptCount++
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
log.Printf("Trying to find cluster: num targets to try: %d", targets.Len())
|
log.Printf("Trying to find cluster: num targets to try: %d", targets.Len())
|
||||||
for _, p := range targets.All() {
|
for _, p := range targets.All() {
|
||||||
if !p.Online {
|
if !p.Online {
|
||||||
log.Printf("Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0])
|
log.Printf("Bootstrap: Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Printf("Trying to find cluster: trying %s", p.TailscaleIPs[0])
|
log.Printf("Trying to find cluster: trying %s", p.TailscaleIPs[0])
|
||||||
@ -317,10 +345,10 @@ func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus]) error {
|
|||||||
RemoteID: c.self.id,
|
RemoteID: c.self.id,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err)
|
log.Printf("Bootstrap: Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Printf("Trying to find cluster: joined %s", p.TailscaleIPs[0])
|
log.Printf("Bootstrap: Trying to find cluster: joined %s", p.TailscaleIPs[0])
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,7 +262,7 @@ func TestStart(t *testing.T) {
|
|||||||
waitForNodesToBeTaggedInStatus(t, ctx, one, []key.NodePublic{k}, clusterTag)
|
waitForNodesToBeTaggedInStatus(t, ctx, one, []key.NodePublic{k}, clusterTag)
|
||||||
|
|
||||||
sm := &fsm{}
|
sm := &fsm{}
|
||||||
r, err := Start(ctx, one, sm, clusterTag, warnLogConfig())
|
r, err := Start(ctx, one, sm, BootstrapOpts{Tag: clusterTag}, warnLogConfig())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -334,7 +334,7 @@ func createConsensusCluster(t testing.TB, ctx context.Context, clusterTag string
|
|||||||
t.Helper()
|
t.Helper()
|
||||||
participants[0].sm = &fsm{}
|
participants[0].sm = &fsm{}
|
||||||
myCfg := addIDedLogger("0", cfg)
|
myCfg := addIDedLogger("0", cfg)
|
||||||
first, err := Start(ctx, participants[0].ts, participants[0].sm, clusterTag, myCfg)
|
first, err := Start(ctx, participants[0].ts, participants[0].sm, BootstrapOpts{Tag: clusterTag}, myCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -347,7 +347,7 @@ func createConsensusCluster(t testing.TB, ctx context.Context, clusterTag string
|
|||||||
for i := 1; i < len(participants); i++ {
|
for i := 1; i < len(participants); i++ {
|
||||||
participants[i].sm = &fsm{}
|
participants[i].sm = &fsm{}
|
||||||
myCfg := addIDedLogger(fmt.Sprintf("%d", i), cfg)
|
myCfg := addIDedLogger(fmt.Sprintf("%d", i), cfg)
|
||||||
c, err := Start(ctx, participants[i].ts, participants[i].sm, clusterTag, myCfg)
|
c, err := Start(ctx, participants[i].ts, participants[i].sm, BootstrapOpts{Tag: clusterTag}, myCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -530,7 +530,7 @@ func TestFollowerFailover(t *testing.T) {
|
|||||||
// follower comes back
|
// follower comes back
|
||||||
smThreeAgain := &fsm{}
|
smThreeAgain := &fsm{}
|
||||||
cfg = addIDedLogger("2 after restarting", warnLogConfig())
|
cfg = addIDedLogger("2 after restarting", warnLogConfig())
|
||||||
rThreeAgain, err := Start(ctx, ps[2].ts, smThreeAgain, clusterTag, cfg)
|
rThreeAgain, err := Start(ctx, ps[2].ts, smThreeAgain, BootstrapOpts{Tag: clusterTag}, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -565,7 +565,7 @@ func TestRejoin(t *testing.T) {
|
|||||||
tagNodes(t, control, []key.NodePublic{keyJoiner}, clusterTag)
|
tagNodes(t, control, []key.NodePublic{keyJoiner}, clusterTag)
|
||||||
waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{keyJoiner}, clusterTag)
|
waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{keyJoiner}, clusterTag)
|
||||||
smJoiner := &fsm{}
|
smJoiner := &fsm{}
|
||||||
cJoiner, err := Start(ctx, tsJoiner, smJoiner, clusterTag, cfg)
|
cJoiner, err := Start(ctx, tsJoiner, smJoiner, BootstrapOpts{Tag: clusterTag}, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -744,3 +744,49 @@ func TestOnlyTaggedPeersCanJoin(t *testing.T) {
|
|||||||
t.Fatalf("join req when not tagged, expected body: %s, got: %s", expected, sBody)
|
t.Fatalf("join req when not tagged, expected body: %s, got: %s", expected, sBody)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFollowAddr(t *testing.T) {
|
||||||
|
testConfig(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
clusterTag := "tag:whatever"
|
||||||
|
ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3)
|
||||||
|
cfg := warnLogConfig()
|
||||||
|
|
||||||
|
// get the address for the node that's going to be the leader
|
||||||
|
lc, err := ps[0].ts.LocalClient()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
status, err := lc.Status(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
firstV4 := status.TailscaleIPs[0]
|
||||||
|
|
||||||
|
// start the second and third participants of the cluster with FollowAddr, Start won't
|
||||||
|
// return until they have joined the leader.
|
||||||
|
secondDone := make(chan error, 1)
|
||||||
|
thirdDone := make(chan error, 1)
|
||||||
|
startParticipantConsensus := func(p *participant, done chan error) {
|
||||||
|
_, err := Start(ctx, p.ts, p.sm, BootstrapOpts{Tag: clusterTag, FollowAddr: firstV4.String()}, cfg)
|
||||||
|
done <- err
|
||||||
|
}
|
||||||
|
go startParticipantConsensus(ps[1], secondDone)
|
||||||
|
go startParticipantConsensus(ps[2], thirdDone)
|
||||||
|
|
||||||
|
// start the leader
|
||||||
|
_, err = Start(ctx, ps[0].ts, ps[0].sm, BootstrapOpts{Tag: clusterTag}, cfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify second and third start without error
|
||||||
|
err = <-secondDone
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error starting second", err)
|
||||||
|
}
|
||||||
|
err = <-thirdDone
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error starting third", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user