diff --git a/go.mod b/go.mod index a566c941f..7f60e32d6 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,8 @@ require ( github.com/google/nftables v0.2.1-0.20240414091927-5e242ec57806 github.com/google/uuid v1.6.0 github.com/goreleaser/nfpm/v2 v2.33.1 + github.com/hashicorp/go-hclog v1.6.2 + github.com/hashicorp/raft v1.7.2 github.com/hdevalence/ed25519consensus v0.2.0 github.com/illarion/gonotify/v3 v3.0.2 github.com/inetaf/tcpproxy v0.0.0-20250203165043-ded522cbd03f @@ -128,6 +130,7 @@ require ( github.com/OpenPeeDeeP/depguard/v2 v2.2.0 // indirect github.com/alecthomas/go-check-sumtype v0.1.4 // indirect github.com/alexkohler/nakedret/v2 v2.0.4 // indirect + github.com/armon/go-metrics v0.4.1 // indirect github.com/bombsimon/wsl/v4 v4.2.1 // indirect github.com/butuzov/mirror v1.1.0 // indirect github.com/catenacyber/perfsprint v0.7.1 // indirect @@ -146,6 +149,10 @@ require ( github.com/golangci/plugin-module-register v0.1.1 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect github.com/gorilla/securecookie v1.1.2 // indirect + github.com/hashicorp/go-immutable-radix v1.3.1 // indirect + github.com/hashicorp/go-metrics v0.5.4 // indirect + github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect + github.com/hashicorp/golang-lru v0.6.0 // indirect github.com/jjti/go-spancheck v0.5.3 // indirect github.com/karamaru-alpha/copyloopvar v1.0.8 // indirect github.com/macabu/inamedparam v0.1.3 // indirect diff --git a/go.sum b/go.sum index 528e48c16..3c168cdd5 100644 --- a/go.sum +++ b/go.sum @@ -61,8 +61,9 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c h1:pxW6RcqyfI9/kWtOwnv/G+AzdKuy2ZrqINhenH4HyNs= github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= -github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= +github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Djarvur/go-err113 v0.1.0 h1:uCRZZOdMQ0TZPHYTdYpoC0bLYJKPEHPUJ8MeAa51lNU= github.com/Djarvur/go-err113 v0.1.0/go.mod h1:4UJr5HIiMZrwgkSPdsjy2uOQExX/WEILpIrO9UPGuXs= github.com/GaijinEntertainment/go-exhaustruct/v3 v3.2.0 h1:sATXp1x6/axKxz2Gjxv8MALP0bXaNRfQinEwyfMcx8c= @@ -114,6 +115,8 @@ github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1 github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= +github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= +github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/armon/go-proxyproto v0.0.0-20210323213023-7e956b284f0a/go.mod h1:QmP9hvJ91BbJmGVGSbutW19IC0Q9phDCLGaomwTJbgU= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= @@ -212,6 +215,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/cilium/ebpf v0.15.0 h1:7NxJhNiBT3NG8pZJ3c+yfrVdHY8ScgKD27sScgjLMMk= github.com/cilium/ebpf v0.15.0/go.mod h1:DHp1WyrLeiBh19Cf/tfiSMhqheEiK8fXFZ4No0P1Hso= +github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= +github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/ckaznocha/intrange v0.1.0 h1:ZiGBhvrdsKpoEfzh9CjBfDSZof6QB0ORY5tXasUtiew= github.com/ckaznocha/intrange v0.1.0/go.mod h1:Vwa9Ekex2BrEQMg6zlrWwbs/FtYw7eS5838Q7UjK7TQ= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -290,6 +295,7 @@ github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0 github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/evanw/esbuild v0.19.11 h1:mbPO1VJ/df//jjUd+p/nRLYCpizXxXb2w/zZMShxa2k= github.com/evanw/esbuild v0.19.11/go.mod h1:D2vIQZqV/vIf/VRHtViaUtViZmG7o+kKmlBfVQuRi48= +github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4= @@ -531,13 +537,30 @@ github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Rep github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v1.6.2 h1:NOtoftovWkDheyUM/8JW3QMiXyxJK3uHRK7wV04nD2I= +github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc= +github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-metrics v0.5.4 h1:8mmPiIJkTPPEbAiV97IxdAGNdRdaWwVap1BU6elejKY= +github.com/hashicorp/go-metrics v0.5.4/go.mod h1:CG5yz4NZ/AI/aQt9Ucm/vdBnbh7fvmv4lxZ350i+QQI= +github.com/hashicorp/go-msgpack/v2 v2.1.2 h1:4Ee8FTp834e+ewB71RDrQ0VKpyFdrKOjvYtnQ/ltVj0= +github.com/hashicorp/go-msgpack/v2 v2.1.2/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4= +github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= +github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4= +github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hashicorp/raft v1.7.2 h1:pyvxhfJ4R8VIAlHKvLoKQWElZspsCVT6YWuxVxsPAgc= +github.com/hashicorp/raft v1.7.2/go.mod h1:DfvCGFxpAUPE0L4Uc8JLlTPtc3GzSbdH0MTJCLgnmJQ= github.com/hdevalence/ed25519consensus v0.2.0 h1:37ICyZqdyj0lAZ8P4D1d1id3HqbbG1N3iBb1Tb4rdcU= github.com/hdevalence/ed25519consensus v0.2.0/go.mod h1:w3BHWjwJbFU29IRHL1Iqkw3sus+7FctEyM4RqDxYNzo= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= @@ -582,6 +605,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/jsimonetti/rtnetlink v1.4.0 h1:Z1BF0fRgcETPEa0Kt0MRk3yV5+kF1FWTni6KUFKrq2I= github.com/jsimonetti/rtnetlink v1.4.0/go.mod h1:5W1jDvWdnthFJ7fxYX1GMK07BUpI4oskfOqvPteYS6E= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -654,8 +678,12 @@ github.com/matoous/godox v0.0.0-20230222163458-006bad1f9d26 h1:gWg6ZQ4JhDfJPqlo2 github.com/matoous/godox v0.0.0-20230222163458-006bad1f9d26/go.mod h1:1BELzlh859Sh1c6+90blK8lbYy0kwQf1bYlBhBysy1s= github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE= github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= +github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= @@ -737,6 +765,8 @@ github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJ github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6j4vs= github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT91xUo= github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc= +github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= +github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml/v2 v2.2.0 h1:QLgLl2yMN7N+ruc31VynXs1vhMZa7CeHHejIeBAsoHo= github.com/pelletier/go-toml/v2 v2.2.0/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/peterbourgon/ff/v3 v3.4.0 h1:QBvM/rizZM1cB0p0lGMdmR7HxZeI/ZrBWB4DqLkMUBc= @@ -765,8 +795,10 @@ github.com/prometheus-community/pro-bing v0.4.0 h1:YMbv+i08gQz97OZZBwLyvmmQEEzyf github.com/prometheus-community/pro-bing v0.4.0/go.mod h1:b7wRYZtCcPmt4Sz319BykUU241rWLe1VFXyiyWK/dH4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= @@ -777,6 +809,7 @@ github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6T github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= @@ -784,6 +817,7 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= @@ -884,6 +918,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -950,6 +985,7 @@ github.com/tommy-muehle/go-mnd/v2 v2.5.1 h1:NowYhSdyE/1zwK9QCLeRb6USWdoif80Ie+v+ github.com/tommy-muehle/go-mnd/v2 v2.5.1/go.mod h1:WsUAkMJMYww6l/ufffCD3m+P7LEvr8TnZn9lwVDlgzw= github.com/toqueteos/webbrowser v1.2.0 h1:tVP/gpK69Fx+qMJKsLE7TD8LuGWPnEV71wBN9rrstGQ= github.com/toqueteos/webbrowser v1.2.0/go.mod h1:XWoZq4cyp9WeUeak7w7LXRUQf1F1ATJMir8RTqb4ayM= +github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/u-root/gobusybox/src v0.0.0-20231228173702-b69f654846aa h1:unMPGGK/CRzfg923allsikmvk2l7beBeFPUNC4RVX/8= github.com/u-root/gobusybox/src v0.0.0-20231228173702-b69f654846aa/go.mod h1:Zj4Tt22fJVn/nz/y6Ergm1SahR9dio1Zm/D2/S0TmXM= github.com/u-root/u-root v0.12.0 h1:K0AuBFriwr0w/PGS3HawiAw89e3+MU7ks80GpghAsNs= @@ -1178,6 +1214,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1203,9 +1240,12 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211105183446-c75c47738b0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220702020025-31831981b65f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/tsconsensus/authorization.go b/tsconsensus/authorization.go new file mode 100644 index 000000000..e19ce0744 --- /dev/null +++ b/tsconsensus/authorization.go @@ -0,0 +1,138 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package tsconsensus + +import ( + "context" + "errors" + "net/netip" + "sync" + "time" + + "tailscale.com/ipn" + "tailscale.com/ipn/ipnstate" + "tailscale.com/tsnet" + "tailscale.com/types/views" + "tailscale.com/util/set" +) + +type statusGetter interface { + getStatus(context.Context) (*ipnstate.Status, error) +} + +type tailscaleStatusGetter struct { + ts *tsnet.Server + + mu sync.Mutex // protects the following + lastStatus *ipnstate.Status + lastStatusTime time.Time +} + +func (sg *tailscaleStatusGetter) fetchStatus(ctx context.Context) (*ipnstate.Status, error) { + lc, err := sg.ts.LocalClient() + if err != nil { + return nil, err + } + return lc.Status(ctx) +} + +func (sg *tailscaleStatusGetter) getStatus(ctx context.Context) (*ipnstate.Status, error) { + sg.mu.Lock() + defer sg.mu.Unlock() + if sg.lastStatus != nil && time.Since(sg.lastStatusTime) < 1*time.Second { + return sg.lastStatus, nil + } + status, err := sg.fetchStatus(ctx) + if err != nil { + return nil, err + } + sg.lastStatus = status + sg.lastStatusTime = time.Now() + return status, nil +} + +type authorization struct { + sg statusGetter + tag string + + mu sync.Mutex + peers *peers // protected by mu +} + +func newAuthorization(ts *tsnet.Server, tag string) *authorization { + return &authorization{ + sg: &tailscaleStatusGetter{ + ts: ts, + }, + tag: tag, + } +} + +func (a *authorization) Refresh(ctx context.Context) error { + tStatus, err := a.sg.getStatus(ctx) + if err != nil { + return err + } + if tStatus == nil { + return errors.New("no status") + } + if tStatus.BackendState != ipn.Running.String() { + return errors.New("ts Server is not running") + } + a.mu.Lock() + defer a.mu.Unlock() + a.peers = newPeers(tStatus, a.tag) + return nil +} + +func (a *authorization) AllowsHost(addr netip.Addr) bool { + if a.peers == nil { + return false + } + a.mu.Lock() + defer a.mu.Unlock() + return a.peers.peerExists(addr, a.tag) +} + +func (a *authorization) SelfAllowed() bool { + if a.peers == nil { + return false + } + a.mu.Lock() + defer a.mu.Unlock() + return a.peers.status.Self.Tags != nil && views.SliceContains(*a.peers.status.Self.Tags, a.tag) +} + +func (a *authorization) AllowedPeers() views.Slice[*ipnstate.PeerStatus] { + if a.peers == nil { + return views.Slice[*ipnstate.PeerStatus]{} + } + a.mu.Lock() + defer a.mu.Unlock() + return views.SliceOf(a.peers.allowedPeers) +} + +type peers struct { + status *ipnstate.Status + allowedRemoteAddrs set.Set[netip.Addr] + allowedPeers []*ipnstate.PeerStatus +} + +func (ps *peers) peerExists(a netip.Addr, tag string) bool { + return ps.allowedRemoteAddrs.Contains(a) +} + +func newPeers(status *ipnstate.Status, tag string) *peers { + ps := &peers{ + status: status, + allowedRemoteAddrs: set.Set[netip.Addr]{}, + } + for _, p := range status.Peer { + if p.Tags != nil && views.SliceContains(*p.Tags, tag) { + ps.allowedPeers = append(ps.allowedPeers, p) + ps.allowedRemoteAddrs.AddSlice(p.TailscaleIPs) + } + } + return ps +} diff --git a/tsconsensus/authorization_test.go b/tsconsensus/authorization_test.go new file mode 100644 index 000000000..d2cd85d91 --- /dev/null +++ b/tsconsensus/authorization_test.go @@ -0,0 +1,177 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package tsconsensus + +import ( + "context" + "net/netip" + "testing" + + "tailscale.com/ipn" + "tailscale.com/ipn/ipnstate" + "tailscale.com/types/key" + "tailscale.com/types/views" +) + +type testStatusGetter struct { + status *ipnstate.Status +} + +func (sg testStatusGetter) getStatus(ctx context.Context) (*ipnstate.Status, error) { + return sg.status, nil +} + +const testTag string = "tag:clusterTag" + +func authForStatus(s *ipnstate.Status) *authorization { + return &authorization{ + sg: testStatusGetter{ + status: s, + }, + tag: testTag, + } +} + +func addrsForIndex(i int) []netip.Addr { + return []netip.Addr{ + netip.AddrFrom4([4]byte{100, 0, 0, byte(i)}), + netip.AddrFrom4([4]byte{100, 0, 1, byte(i)}), + } +} + +func statusForTags(self []string, peers [][]string) *ipnstate.Status { + selfTags := views.SliceOf(self) + s := &ipnstate.Status{ + BackendState: ipn.Running.String(), + Self: &ipnstate.PeerStatus{ + Tags: &selfTags, + }, + Peer: map[key.NodePublic]*ipnstate.PeerStatus{}, + } + for i, tagStrings := range peers { + tags := views.SliceOf(tagStrings) + s.Peer[key.NewNode().Public()] = &ipnstate.PeerStatus{ + Tags: &tags, + TailscaleIPs: addrsForIndex(i), + } + + } + return s +} + +func authForTags(self []string, peers [][]string) *authorization { + return authForStatus(statusForTags(self, peers)) +} + +func TestAuthRefreshErrorsNotRunning(t *testing.T) { + ctx := context.Background() + + a := authForStatus(nil) + err := a.Refresh(ctx) + if err == nil { + t.Fatalf("expected err to be non-nil") + } + expected := "no status" + if err.Error() != expected { + t.Fatalf("expected: %s, got: %s", expected, err.Error()) + } + + a = authForStatus(&ipnstate.Status{ + BackendState: "NeedsMachineAuth", + }) + err = a.Refresh(ctx) + if err == nil { + t.Fatalf("expected err to be non-nil") + } + expected = "ts Server is not running" + if err.Error() != expected { + t.Fatalf("expected: %s, got: %s", expected, err.Error()) + } +} + +func TestAuthUnrefreshed(t *testing.T) { + a := authForStatus(nil) + if a.AllowsHost(netip.MustParseAddr("100.0.0.1")) { + t.Fatalf("never refreshed authorization, allowsHost: expected false, got true") + } + gotAllowedPeers := a.AllowedPeers() + if gotAllowedPeers.Len() != 0 { + t.Fatalf("never refreshed authorization, allowedPeers: expected [], got %v", gotAllowedPeers) + } + if a.SelfAllowed() != false { + t.Fatalf("never refreshed authorization, selfAllowed: expected false got true") + } +} + +func TestAuthAllowsHost(t *testing.T) { + ctx := context.Background() + peerTags := [][]string{ + {"woo"}, + nil, + {"woo", testTag}, + {testTag}, + } + expected := []bool{ + false, + false, + true, + true, + } + a := authForTags(nil, peerTags) + err := a.Refresh(ctx) + if err != nil { + t.Fatal(err) + } + + for i, tags := range peerTags { + for _, addr := range addrsForIndex(i) { + got := a.AllowsHost(addr) + if got != expected[i] { + t.Fatalf("allowed %v, expected: %t, got %t", tags, expected[i], got) + } + } + } +} + +func TestAuthAllowedPeers(t *testing.T) { + ctx := context.Background() + a := authForTags(nil, [][]string{ + {"woo"}, + nil, + {"woo", testTag}, + {testTag}, + }) + err := a.Refresh(ctx) + if err != nil { + t.Fatal(err) + } + ps := a.AllowedPeers() + if ps.Len() != 2 { + t.Fatalf("expected: 2, got: %d", ps.Len()) + } +} + +func TestAuthSelfAllowed(t *testing.T) { + ctx := context.Background() + + a := authForTags([]string{"woo"}, nil) + err := a.Refresh(ctx) + if err != nil { + t.Fatal(err) + } + got := a.SelfAllowed() + if got { + t.Fatalf("expected: false, got: %t", got) + } + + a = authForTags([]string{"woo", testTag}, nil) + err = a.Refresh(ctx) + if err != nil { + t.Fatal(err) + } + got = a.SelfAllowed() + if !got { + t.Fatalf("expected: true, got: %t", got) + } +} diff --git a/tsconsensus/http.go b/tsconsensus/http.go new file mode 100644 index 000000000..a1f06cf5a --- /dev/null +++ b/tsconsensus/http.go @@ -0,0 +1,182 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package tsconsensus + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "time" + + "tailscale.com/util/httpm" +) + +type joinRequest struct { + RemoteHost string + RemoteID string +} + +type commandClient struct { + port uint16 + httpClient *http.Client +} + +func (rac *commandClient) url(host string, path string) string { + return fmt.Sprintf("http://%s:%d%s", host, rac.port, path) +} + +const maxBodyBytes = 1024 * 1024 + +func readAllMaxBytes(r io.Reader) ([]byte, error) { + return io.ReadAll(io.LimitReader(r, maxBodyBytes)) +} + +func (rac *commandClient) join(host string, jr joinRequest) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + rBs, err := json.Marshal(jr) + if err != nil { + return err + } + url := rac.url(host, "/join") + req, err := http.NewRequestWithContext(ctx, httpm.POST, url, bytes.NewReader(rBs)) + if err != nil { + return err + } + resp, err := rac.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + respBs, err := readAllMaxBytes(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 200 { + return fmt.Errorf("remote responded %d: %s", resp.StatusCode, string(respBs)) + } + return nil +} + +func (rac *commandClient) executeCommand(host string, bs []byte) (CommandResult, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + url := rac.url(host, "/executeCommand") + req, err := http.NewRequestWithContext(ctx, httpm.POST, url, bytes.NewReader(bs)) + if err != nil { + return CommandResult{}, err + } + resp, err := rac.httpClient.Do(req) + if err != nil { + return CommandResult{}, err + } + defer resp.Body.Close() + respBs, err := readAllMaxBytes(resp.Body) + if err != nil { + return CommandResult{}, err + } + if resp.StatusCode != 200 { + return CommandResult{}, fmt.Errorf("remote responded %d: %s", resp.StatusCode, string(respBs)) + } + var cr CommandResult + if err = json.Unmarshal(respBs, &cr); err != nil { + return CommandResult{}, err + } + return cr, nil +} + +type authedHandler struct { + auth *authorization + handler http.Handler +} + +func (h authedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + err := h.auth.Refresh(r.Context()) + if err != nil { + log.Printf("error authedHandler ServeHTTP refresh auth: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + a, err := addrFromServerAddress(r.RemoteAddr) + if err != nil { + log.Printf("error authedHandler ServeHTTP refresh auth: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + allowed := h.auth.AllowsHost(a) + if !allowed { + http.Error(w, "peer not allowed", http.StatusForbidden) + return + } + h.handler.ServeHTTP(w, r) +} + +func (c *Consensus) handleJoinHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + decoder := json.NewDecoder(http.MaxBytesReader(w, r.Body, maxBodyBytes)) + var jr joinRequest + err := decoder.Decode(&jr) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + _, err = decoder.Token() + if !errors.Is(err, io.EOF) { + http.Error(w, "Request body must only contain a single JSON object", http.StatusBadRequest) + return + } + if jr.RemoteHost == "" { + http.Error(w, "Required: remoteAddr", http.StatusBadRequest) + return + } + if jr.RemoteID == "" { + http.Error(w, "Required: remoteID", http.StatusBadRequest) + return + } + err = c.handleJoin(jr) + if err != nil { + log.Printf("join handler error: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } +} + +func (c *Consensus) handleExecuteCommandHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + decoder := json.NewDecoder(r.Body) + var cmd Command + err := decoder.Decode(&cmd) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + result, err := c.executeCommandLocally(cmd) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := json.NewEncoder(w).Encode(result); err != nil { + log.Printf("error encoding execute command result: %v", err) + return + } +} + +func (c *Consensus) makeCommandMux() *http.ServeMux { + mux := http.NewServeMux() + mux.HandleFunc("POST /join", c.handleJoinHTTP) + mux.HandleFunc("POST /executeCommand", c.handleExecuteCommandHTTP) + return mux +} + +func (c *Consensus) makeCommandHandler(auth *authorization) http.Handler { + return authedHandler{ + handler: c.makeCommandMux(), + auth: auth, + } +} diff --git a/tsconsensus/monitor.go b/tsconsensus/monitor.go new file mode 100644 index 000000000..e337e78be --- /dev/null +++ b/tsconsensus/monitor.go @@ -0,0 +1,161 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package tsconsensus + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "slices" + + "tailscale.com/ipn" + "tailscale.com/ipn/ipnstate" + "tailscale.com/tsnet" + "tailscale.com/util/dnsname" +) + +type status struct { + Status *ipnstate.Status + RaftState string +} + +type monitor struct { + ts *tsnet.Server + con *Consensus +} + +func (m *monitor) getStatus(ctx context.Context) (status, error) { + lc, err := m.ts.LocalClient() + if err != nil { + return status{}, err + } + tStatus, err := lc.Status(ctx) + if err != nil { + return status{}, err + } + return status{Status: tStatus, RaftState: m.con.raft.State().String()}, nil +} + +func serveMonitor(c *Consensus, ts *tsnet.Server, listenAddr string) (*http.Server, error) { + ln, err := ts.Listen("tcp", listenAddr) + if err != nil { + return nil, err + } + m := &monitor{con: c, ts: ts} + mux := http.NewServeMux() + mux.HandleFunc("GET /full", m.handleFullStatus) + mux.HandleFunc("GET /{$}", m.handleSummaryStatus) + mux.HandleFunc("GET /netmap", m.handleNetmap) + mux.HandleFunc("POST /dial", m.handleDial) + srv := &http.Server{Handler: mux} + go func() { + err := srv.Serve(ln) + log.Printf("MonitorHTTP stopped serving with error: %v", err) + }() + return srv, nil +} + +func (m *monitor) handleFullStatus(w http.ResponseWriter, r *http.Request) { + s, err := m.getStatus(r.Context()) + if err != nil { + log.Printf("monitor: error getStatus: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + if err := json.NewEncoder(w).Encode(s); err != nil { + log.Printf("monitor: error encoding full status: %v", err) + return + } +} + +func (m *monitor) handleSummaryStatus(w http.ResponseWriter, r *http.Request) { + s, err := m.getStatus(r.Context()) + if err != nil { + log.Printf("monitor: error getStatus: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + lines := []string{} + for _, p := range s.Status.Peer { + if p.Online { + name := dnsname.FirstLabel(p.DNSName) + lines = append(lines, fmt.Sprintf("%s\t\t%d\t%d\t%t", name, p.RxBytes, p.TxBytes, p.Active)) + } + } + _, err = w.Write([]byte(fmt.Sprintf("RaftState: %s\n", s.RaftState))) + if err != nil { + log.Printf("monitor: error writing status: %v", err) + return + } + + slices.Sort(lines) + for _, l := range lines { + _, err = w.Write([]byte(fmt.Sprintf("%s\n", l))) + if err != nil { + log.Printf("monitor: error writing status: %v", err) + return + } + } +} + +func (m *monitor) handleNetmap(w http.ResponseWriter, r *http.Request) { + var mask ipn.NotifyWatchOpt = ipn.NotifyInitialNetMap + mask |= ipn.NotifyNoPrivateKeys + lc, err := m.ts.LocalClient() + if err != nil { + log.Printf("monitor: error LocalClient: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + watcher, err := lc.WatchIPNBus(r.Context(), mask) + if err != nil { + log.Printf("monitor: error WatchIPNBus: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + defer watcher.Close() + + n, err := watcher.Next() + if err != nil { + log.Printf("monitor: error watcher.Next: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + encoder := json.NewEncoder(w) + encoder.SetIndent("", "\t") + if err := encoder.Encode(n); err != nil { + log.Printf("monitor: error encoding netmap: %v", err) + return + } +} + +func (m *monitor) handleDial(w http.ResponseWriter, r *http.Request) { + var dialParams struct { + Addr string + } + defer r.Body.Close() + bs, err := io.ReadAll(http.MaxBytesReader(w, r.Body, maxBodyBytes)) + if err != nil { + log.Printf("monitor: error reading body: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + err = json.Unmarshal(bs, &dialParams) + if err != nil { + log.Printf("monitor: error unmarshalling json: %v", err) + http.Error(w, "", http.StatusBadRequest) + return + } + c, err := m.ts.Dial(r.Context(), "tcp", dialParams.Addr) + if err != nil { + log.Printf("monitor: error dialing: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + c.Close() + w.Write([]byte("ok\n")) +} diff --git a/tsconsensus/tsconsensus.go b/tsconsensus/tsconsensus.go new file mode 100644 index 000000000..6cca024ee --- /dev/null +++ b/tsconsensus/tsconsensus.go @@ -0,0 +1,445 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package tsconsensus implements a consensus algorithm for a group of tsnet.Servers +// +// The Raft consensus algorithm relies on you implementing a state machine that will give the same +// result to a given command as long as the same logs have been applied in the same order. +// +// tsconsensus uses the hashicorp/raft library to implement leader elections and log application. +// +// tsconsensus provides: +// - cluster peer discovery based on tailscale tags +// - executing a command on the leader +// - communication between cluster peers over tailscale using tsnet +// +// Users implement a state machine that satisfies the raft.FSM interface, with the business logic they desire. +// When changes to state are needed any node may +// - create a Command instance with serialized Args. +// - call ExecuteCommand with the Command instance +// this will propagate the command to the leader, +// and then from the reader to every node via raft. +// - the state machine then can implement raft.Apply, and dispatch commands via the Command.Name +// returning a CommandResult with an Err or a serialized Result. +package tsconsensus + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "net" + "net/http" + "net/netip" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/raft" + "tailscale.com/ipn/ipnstate" + "tailscale.com/tsnet" + "tailscale.com/types/views" +) + +func raftAddr(host netip.Addr, cfg Config) string { + return netip.AddrPortFrom(host, cfg.RaftPort).String() +} + +func addrFromServerAddress(sa string) (netip.Addr, error) { + addrPort, err := netip.ParseAddrPort(sa) + if err != nil { + return netip.Addr{}, err + } + return addrPort.Addr(), nil +} + +// A selfRaftNode is the info we need to talk to hashicorp/raft about our node. +// We specify the ID and Addr on Consensus Start, and then use it later for raft +// operations such as BootstrapCluster and AddVoter. +type selfRaftNode struct { + id string + hostAddr netip.Addr +} + +// A Config holds configurable values such as ports and timeouts. +// Use DefaultConfig to get a useful Config. +type Config struct { + CommandPort uint16 + RaftPort uint16 + MonitorPort uint16 + Raft *raft.Config + MaxConnPool int + ConnTimeout time.Duration + ServeDebugMonitor bool +} + +// DefaultConfig returns a Config populated with default values ready for use. +func DefaultConfig() Config { + raftConfig := raft.DefaultConfig() + // these values are 2x the raft DefaultConfig + raftConfig.HeartbeatTimeout = 2000 * time.Millisecond + raftConfig.ElectionTimeout = 2000 * time.Millisecond + raftConfig.LeaderLeaseTimeout = 1000 * time.Millisecond + + return Config{ + CommandPort: 6271, + RaftPort: 6270, + MonitorPort: 8081, + Raft: raftConfig, + MaxConnPool: 5, + ConnTimeout: 5 * time.Second, + } +} + +// StreamLayer implements an interface asked for by raft.NetworkTransport. +// It does the raft interprocess communication via tailscale. +type StreamLayer struct { + net.Listener + s *tsnet.Server + auth *authorization + shutdownCtx context.Context +} + +// Dial implements the raft.StreamLayer interface with the tsnet.Server's Dial. +func (sl StreamLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) { + ctx, cancel := context.WithTimeout(sl.shutdownCtx, timeout) + defer cancel() + authorized, err := sl.addrAuthorized(ctx, string(address)) + if err != nil { + return nil, err + } + if !authorized { + return nil, errors.New("peer is not allowed") + } + return sl.s.Dial(ctx, "tcp", string(address)) +} + +func (sl StreamLayer) addrAuthorized(ctx context.Context, address string) (bool, error) { + addr, err := addrFromServerAddress(address) + if err != nil { + // bad RemoteAddr is not authorized + return false, nil + } + err = sl.auth.Refresh(ctx) + if err != nil { + // might be authorized, we couldn't tell + return false, err + } + return sl.auth.AllowsHost(addr), nil +} + +func (sl StreamLayer) Accept() (net.Conn, error) { + ctx, cancel := context.WithCancel(sl.shutdownCtx) + defer cancel() + for { + conn, err := sl.Listener.Accept() + if err != nil || conn == nil { + return conn, err + } + addr := conn.RemoteAddr() + if addr == nil { + conn.Close() + return nil, errors.New("conn has no remote addr") + } + authorized, err := sl.addrAuthorized(ctx, addr.String()) + if err != nil { + conn.Close() + return nil, err + } + if !authorized { + conn.Close() + continue + } + return conn, err + } +} + +// Start returns a pointer to a running Consensus instance. +// Calling it with a *tsnet.Server will cause that server to join or start a consensus cluster +// with other nodes on the tailnet tagged with the clusterTag. The *tsnet.Server will run the state +// machine defined by the raft.FSM also provided, and keep it in sync with the other cluster members' +// state machines using Raft. +func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag string, cfg Config) (*Consensus, error) { + if clusterTag == "" { + return nil, errors.New("cluster tag must be provided") + } + + cc := commandClient{ + port: cfg.CommandPort, + httpClient: ts.HTTPClient(), + } + v4, _ := ts.TailscaleIPs() + self := selfRaftNode{ + id: v4.String(), + hostAddr: v4, + } + shutdownCtx, shutdownCtxCancel := context.WithCancel(ctx) + c := Consensus{ + commandClient: &cc, + self: self, + config: cfg, + shutdownCtxCancel: shutdownCtxCancel, + } + + auth := newAuthorization(ts, clusterTag) + err := auth.Refresh(shutdownCtx) + if err != nil { + return nil, fmt.Errorf("auth refresh: %w", err) + } + if !auth.SelfAllowed() { + return nil, errors.New("this node is not tagged with the cluster tag") + } + + // after startRaft it's possible some other raft node that has us in their configuration will get + // in contact, so by the time we do anything else we may already be a functioning member + // of a consensus + r, err := startRaft(shutdownCtx, ts, &fsm, c.self, auth, cfg) + if err != nil { + return nil, err + } + c.raft = r + + srv, err := c.serveCommandHTTP(ts, auth) + if err != nil { + return nil, err + } + c.cmdHttpServer = srv + + c.bootstrap(auth.AllowedPeers()) + + if cfg.ServeDebugMonitor { + srv, err = serveMonitor(&c, ts, netip.AddrPortFrom(c.self.hostAddr, cfg.MonitorPort).String()) + if err != nil { + return nil, err + } + c.monitorHttpServer = srv + } + + return &c, nil +} + +func startRaft(shutdownCtx context.Context, ts *tsnet.Server, fsm *raft.FSM, self selfRaftNode, auth *authorization, cfg Config) (*raft.Raft, error) { + cfg.Raft.LocalID = raft.ServerID(self.id) + + // no persistence (for now?) + logStore := raft.NewInmemStore() + stableStore := raft.NewInmemStore() + snapshots := raft.NewInmemSnapshotStore() + + // opens the listener on the raft port, raft will close it when it thinks it's appropriate + ln, err := ts.Listen("tcp", raftAddr(self.hostAddr, cfg)) + if err != nil { + return nil, err + } + + logger := hclog.New(&hclog.LoggerOptions{ + Name: "raft-net", + Output: cfg.Raft.LogOutput, + Level: hclog.LevelFromString(cfg.Raft.LogLevel), + }) + + transport := raft.NewNetworkTransportWithLogger(StreamLayer{ + s: ts, + Listener: ln, + auth: auth, + shutdownCtx: shutdownCtx, + }, + cfg.MaxConnPool, + cfg.ConnTimeout, + logger) + + return raft.NewRaft(cfg.Raft, *fsm, logStore, stableStore, snapshots, transport) +} + +// A Consensus is the consensus algorithm for a tsnet.Server +// It wraps a raft.Raft instance and performs the peer discovery +// and command execution on the leader. +type Consensus struct { + raft *raft.Raft + commandClient *commandClient + self selfRaftNode + config Config + cmdHttpServer *http.Server + monitorHttpServer *http.Server + shutdownCtxCancel context.CancelFunc +} + +// bootstrap tries to join a raft cluster, or start one. +// +// We need to do the very first raft cluster configuration, but after that raft manages it. +// bootstrap is called at start up, and we are not currently aware of what the cluster config might be, +// our node may already be in it. Try to join the raft cluster of all the other nodes we know about, and +// if unsuccessful, assume we are the first and start our own. +// +// It's possible for bootstrap to return an error, or start a errant breakaway cluster. +// +// We have a list of expected cluster members already from control (the members of the tailnet with the tag) +// so we could do the initial configuration with all servers specified. +// Choose to start with just this machine in the raft configuration instead, as: +// - We want to handle machines joining after start anyway. +// - Not all tagged nodes tailscale believes are active are necessarily actually responsive right now, +// so let each node opt in when able. +func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus]) error { + log.Printf("Trying to find cluster: num targets to try: %d", targets.Len()) + for _, p := range targets.All() { + if !p.Online { + log.Printf("Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0]) + continue + } + log.Printf("Trying to find cluster: trying %s", p.TailscaleIPs[0]) + err := c.commandClient.join(p.TailscaleIPs[0].String(), joinRequest{ + RemoteHost: c.self.hostAddr.String(), + RemoteID: c.self.id, + }) + if err != nil { + log.Printf("Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err) + continue + } + log.Printf("Trying to find cluster: joined %s", p.TailscaleIPs[0]) + return nil + } + + log.Printf("Trying to find cluster: unsuccessful, starting as leader: %s", c.self.hostAddr.String()) + f := c.raft.BootstrapCluster( + raft.Configuration{ + Servers: []raft.Server{ + { + ID: raft.ServerID(c.self.id), + Address: raft.ServerAddress(c.raftAddr(c.self.hostAddr)), + }, + }, + }) + return f.Error() +} + +// ExecuteCommand propagates a Command to be executed on the leader. Which +// uses raft to Apply it to the followers. +func (c *Consensus) ExecuteCommand(cmd Command) (CommandResult, error) { + b, err := json.Marshal(cmd) + if err != nil { + return CommandResult{}, err + } + result, err := c.executeCommandLocally(cmd) + var leErr lookElsewhereError + for errors.As(err, &leErr) { + result, err = c.commandClient.executeCommand(leErr.where, b) + } + return result, err +} + +// Stop attempts to gracefully shutdown various components. +func (c *Consensus) Stop(ctx context.Context) error { + fut := c.raft.Shutdown() + err := fut.Error() + if err != nil { + log.Printf("Stop: Error in Raft Shutdown: %v", err) + } + c.shutdownCtxCancel() + err = c.cmdHttpServer.Shutdown(ctx) + if err != nil { + log.Printf("Stop: Error in command HTTP Shutdown: %v", err) + } + if c.monitorHttpServer != nil { + err = c.monitorHttpServer.Shutdown(ctx) + if err != nil { + log.Printf("Stop: Error in monitor HTTP Shutdown: %v", err) + } + } + return nil +} + +// A Command is a representation of a state machine action. +type Command struct { + // The Name can be used to dispatch the command when received. + Name string + // The Args are serialized for transport. + Args json.RawMessage +} + +// A CommandResult is a representation of the result of a state +// machine action. +type CommandResult struct { + // Err is any error that occurred on the node that tried to execute the command, + // including any error from the underlying operation and deserialization problems etc. + Err error + // Result is serialized for transport. + Result json.RawMessage +} + +type lookElsewhereError struct { + where string +} + +func (e lookElsewhereError) Error() string { + return fmt.Sprintf("not the leader, try: %s", e.where) +} + +var errLeaderUnknown = errors.New("leader unknown") + +func (c *Consensus) serveCommandHTTP(ts *tsnet.Server, auth *authorization) (*http.Server, error) { + ln, err := ts.Listen("tcp", c.commandAddr(c.self.hostAddr)) + if err != nil { + return nil, err + } + srv := &http.Server{Handler: c.makeCommandHandler(auth)} + go func() { + err := srv.Serve(ln) + log.Printf("CmdHttp stopped serving with err: %v", err) + }() + return srv, nil +} + +func (c *Consensus) getLeader() (string, error) { + raftLeaderAddr, _ := c.raft.LeaderWithID() + leaderAddr := (string)(raftLeaderAddr) + if leaderAddr == "" { + // Raft doesn't know who the leader is. + return "", errLeaderUnknown + } + // Raft gives us the address with the raft port, we don't always want that. + host, _, err := net.SplitHostPort(leaderAddr) + return host, err +} + +func (c *Consensus) executeCommandLocally(cmd Command) (CommandResult, error) { + b, err := json.Marshal(cmd) + if err != nil { + return CommandResult{}, err + } + f := c.raft.Apply(b, 0) + err = f.Error() + result := f.Response() + if errors.Is(err, raft.ErrNotLeader) { + leader, err := c.getLeader() + if err != nil { + // we know we're not leader but we were unable to give the address of the leader + return CommandResult{}, err + } + return CommandResult{}, lookElsewhereError{where: leader} + } + if result == nil { + result = CommandResult{} + } + return result.(CommandResult), err +} + +func (c *Consensus) handleJoin(jr joinRequest) error { + addr, err := netip.ParseAddr(jr.RemoteHost) + if err != nil { + return err + } + remoteAddr := c.raftAddr(addr) + f := c.raft.AddVoter(raft.ServerID(jr.RemoteID), raft.ServerAddress(remoteAddr), 0, 0) + if f.Error() != nil { + return f.Error() + } + return nil +} + +func (c *Consensus) raftAddr(host netip.Addr) string { + return raftAddr(host, c.config) +} + +func (c *Consensus) commandAddr(host netip.Addr) string { + return netip.AddrPortFrom(host, c.config.CommandPort).String() +} diff --git a/tsconsensus/tsconsensus_test.go b/tsconsensus/tsconsensus_test.go new file mode 100644 index 000000000..9ddd345de --- /dev/null +++ b/tsconsensus/tsconsensus_test.go @@ -0,0 +1,738 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package tsconsensus + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "net/netip" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/raft" + "tailscale.com/client/tailscale" + "tailscale.com/ipn/store/mem" + "tailscale.com/net/netns" + "tailscale.com/tailcfg" + "tailscale.com/tsnet" + "tailscale.com/tstest/integration" + "tailscale.com/tstest/integration/testcontrol" + "tailscale.com/tstest/nettest" + "tailscale.com/types/key" + "tailscale.com/types/logger" + "tailscale.com/types/views" + "tailscale.com/util/racebuild" +) + +type fsm struct { + mu sync.Mutex + applyEvents []string +} + +func commandWith(t *testing.T, s string) []byte { + jsonArgs, err := json.Marshal(s) + if err != nil { + t.Fatal(err) + } + bs, err := json.Marshal(Command{ + Args: jsonArgs, + }) + if err != nil { + t.Fatal(err) + } + return bs +} + +func fromCommand(bs []byte) (string, error) { + var cmd Command + err := json.Unmarshal(bs, &cmd) + if err != nil { + return "", err + } + var args string + err = json.Unmarshal(cmd.Args, &args) + if err != nil { + return "", err + } + return args, nil +} + +func (f *fsm) Apply(l *raft.Log) any { + f.mu.Lock() + defer f.mu.Unlock() + s, err := fromCommand(l.Data) + if err != nil { + return CommandResult{ + Err: err, + } + } + f.applyEvents = append(f.applyEvents, s) + result, err := json.Marshal(len(f.applyEvents)) + if err != nil { + panic("should be able to Marshal that?") + } + return CommandResult{ + Result: result, + } +} + +func (f *fsm) numEvents() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.applyEvents) +} + +func (f *fsm) eventsMatch(es []string) bool { + f.mu.Lock() + defer f.mu.Unlock() + return cmp.Equal(es, f.applyEvents) +} + +func (f *fsm) Snapshot() (raft.FSMSnapshot, error) { + return nil, nil +} + +func (f *fsm) Restore(rc io.ReadCloser) error { + return nil +} + +func testConfig(t *testing.T) { + // -race AND Parallel makes things start to take too long. + if !racebuild.On { + t.Parallel() + } + nettest.SkipIfNoNetwork(t) +} + +func startControl(t testing.TB) (control *testcontrol.Server, controlURL string) { + t.Helper() + // tailscale/corp#4520: don't use netns for tests. + netns.SetEnabled(false) + t.Cleanup(func() { + netns.SetEnabled(true) + }) + + derpLogf := logger.Discard + derpMap := integration.RunDERPAndSTUN(t, derpLogf, "127.0.0.1") + control = &testcontrol.Server{ + DERPMap: derpMap, + DNSConfig: &tailcfg.DNSConfig{ + Proxied: true, + }, + MagicDNSDomain: "tail-scale.ts.net", + } + control.HTTPTestServer = httptest.NewUnstartedServer(control) + control.HTTPTestServer.Start() + t.Cleanup(control.HTTPTestServer.Close) + controlURL = control.HTTPTestServer.URL + t.Logf("testcontrol listening on %s", controlURL) + return control, controlURL +} + +func startNode(t testing.TB, ctx context.Context, controlURL, hostname string) (*tsnet.Server, key.NodePublic, netip.Addr) { + t.Helper() + + tmp := filepath.Join(t.TempDir(), hostname) + os.MkdirAll(tmp, 0755) + s := &tsnet.Server{ + Dir: tmp, + ControlURL: controlURL, + Hostname: hostname, + Store: new(mem.Store), + Ephemeral: true, + } + t.Cleanup(func() { s.Close() }) + + status, err := s.Up(ctx) + if err != nil { + t.Fatal(err) + } + return s, status.Self.PublicKey, status.TailscaleIPs[0] +} + +func waitForNodesToBeTaggedInStatus(t testing.TB, ctx context.Context, ts *tsnet.Server, nodeKeys []key.NodePublic, tag string) { + t.Helper() + waitFor(t, "nodes tagged in status", func() bool { + lc, err := ts.LocalClient() + if err != nil { + t.Fatal(err) + } + status, err := lc.Status(ctx) + if err != nil { + t.Fatalf("error getting status: %v", err) + } + for _, k := range nodeKeys { + var tags *views.Slice[string] + if k == status.Self.PublicKey { + tags = status.Self.Tags + } else { + tags = status.Peer[k].Tags + } + if tag == "" { + if tags != nil && tags.Len() != 0 { + return false + } + } else { + if tags == nil { + return false + } + if tags.Len() != 1 || tags.At(0) != tag { + return false + } + } + } + return true + }, 2*time.Second) +} + +func tagNodes(t testing.TB, control *testcontrol.Server, nodeKeys []key.NodePublic, tag string) { + t.Helper() + for _, key := range nodeKeys { + n := control.Node(key) + if tag == "" { + if len(n.Tags) != 1 { + t.Fatalf("expected tags to have one tag") + } + n.Tags = nil + } else { + if len(n.Tags) != 0 { + // if we want this to work with multiple tags we'll have to change the logic + // for checking if a tag got removed yet. + t.Fatalf("expected tags to be empty") + } + n.Tags = append(n.Tags, tag) + } + b := true + n.Online = &b + control.UpdateNode(n) + } +} + +func addIDedLogger(id string, c Config) Config { + // logs that identify themselves + c.Raft.Logger = hclog.New(&hclog.LoggerOptions{ + Name: fmt.Sprintf("raft: %s", id), + Output: c.Raft.LogOutput, + Level: hclog.LevelFromString(c.Raft.LogLevel), + }) + return c +} + +func warnLogConfig() Config { + c := DefaultConfig() + // fewer logs from raft + c.Raft.LogLevel = "WARN" + // timeouts long enough that we can form a cluster under -race + c.Raft.LeaderLeaseTimeout = 2 * time.Second + c.Raft.HeartbeatTimeout = 4 * time.Second + c.Raft.ElectionTimeout = 4 * time.Second + return c +} + +func TestStart(t *testing.T) { + testConfig(t) + control, controlURL := startControl(t) + ctx := context.Background() + one, k, _ := startNode(t, ctx, controlURL, "one") + + clusterTag := "tag:whatever" + // nodes must be tagged with the cluster tag, to find each other + tagNodes(t, control, []key.NodePublic{k}, clusterTag) + waitForNodesToBeTaggedInStatus(t, ctx, one, []key.NodePublic{k}, clusterTag) + + sm := &fsm{} + r, err := Start(ctx, one, sm, clusterTag, warnLogConfig()) + if err != nil { + t.Fatal(err) + } + defer r.Stop(ctx) +} + +func waitFor(t testing.TB, msg string, condition func() bool, waitBetweenTries time.Duration) { + t.Helper() + try := 0 + for true { + try++ + done := condition() + if done { + t.Logf("waitFor success: %s: after %d tries", msg, try) + return + } + time.Sleep(waitBetweenTries) + } +} + +type participant struct { + c *Consensus + sm *fsm + ts *tsnet.Server + key key.NodePublic +} + +// starts and tags the *tsnet.Server nodes with the control, waits for the nodes to make successful +// LocalClient Status calls that show the first node as Online. +func startNodesAndWaitForPeerStatus(t testing.TB, ctx context.Context, clusterTag string, nNodes int) ([]*participant, *testcontrol.Server, string) { + t.Helper() + ps := make([]*participant, nNodes) + keysToTag := make([]key.NodePublic, nNodes) + localClients := make([]*tailscale.LocalClient, nNodes) + control, controlURL := startControl(t) + for i := 0; i < nNodes; i++ { + ts, key, _ := startNode(t, ctx, controlURL, fmt.Sprintf("node %d", i)) + ps[i] = &participant{ts: ts, key: key} + keysToTag[i] = key + lc, err := ts.LocalClient() + if err != nil { + t.Fatalf("%d: error getting local client: %v", i, err) + } + localClients[i] = lc + } + tagNodes(t, control, keysToTag, clusterTag) + waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, keysToTag, clusterTag) + fxCameOnline := func() bool { + // all the _other_ nodes see the first as online + for i := 1; i < nNodes; i++ { + status, err := localClients[i].Status(ctx) + if err != nil { + t.Fatalf("%d: error getting status: %v", i, err) + } + if !status.Peer[ps[0].key].Online { + return false + } + } + return true + } + waitFor(t, "other nodes see node 1 online in ts status", fxCameOnline, 2*time.Second) + return ps, control, controlURL +} + +// populates participants with their consensus fields, waits for all nodes to show all nodes +// as part of the same consensus cluster. Starts the first participant first and waits for it to +// become leader before adding other nodes. +func createConsensusCluster(t testing.TB, ctx context.Context, clusterTag string, participants []*participant, cfg Config) { + t.Helper() + participants[0].sm = &fsm{} + myCfg := addIDedLogger("0", cfg) + first, err := Start(ctx, participants[0].ts, participants[0].sm, clusterTag, myCfg) + if err != nil { + t.Fatal(err) + } + fxFirstIsLeader := func() bool { + return first.raft.State() == raft.Leader + } + waitFor(t, "node 0 is leader", fxFirstIsLeader, 2*time.Second) + participants[0].c = first + + for i := 1; i < len(participants); i++ { + participants[i].sm = &fsm{} + myCfg := addIDedLogger(fmt.Sprintf("%d", i), cfg) + c, err := Start(ctx, participants[i].ts, participants[i].sm, clusterTag, myCfg) + if err != nil { + t.Fatal(err) + } + participants[i].c = c + } + + fxRaftConfigContainsAll := func() bool { + for i := 0; i < len(participants); i++ { + fut := participants[i].c.raft.GetConfiguration() + err = fut.Error() + if err != nil { + t.Fatalf("%d: Getting Configuration errored: %v", i, err) + } + if len(fut.Configuration().Servers) != len(participants) { + return false + } + } + return true + } + waitFor(t, "all raft machines have all servers in their config", fxRaftConfigContainsAll, time.Second*2) +} + +func TestApply(t *testing.T) { + testConfig(t) + ctx := context.Background() + clusterTag := "tag:whatever" + ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 2) + cfg := warnLogConfig() + createConsensusCluster(t, ctx, clusterTag, ps, cfg) + for _, p := range ps { + defer p.c.Stop(ctx) + } + + fut := ps[0].c.raft.Apply(commandWith(t, "woo"), 2*time.Second) + err := fut.Error() + if err != nil { + t.Fatalf("Raft Apply Error: %v", err) + } + + want := []string{"woo"} + fxBothMachinesHaveTheApply := func() bool { + return ps[0].sm.eventsMatch(want) && ps[1].sm.eventsMatch(want) + } + waitFor(t, "the apply event made it into both state machines", fxBothMachinesHaveTheApply, time.Second*1) +} + +// calls ExecuteCommand on each participant and checks that all participants get all commands +func assertCommandsWorkOnAnyNode(t testing.TB, participants []*participant) { + t.Helper() + want := []string{} + for i, p := range participants { + si := fmt.Sprintf("%d", i) + want = append(want, si) + bs, err := json.Marshal(si) + if err != nil { + t.Fatal(err) + } + res, err := p.c.ExecuteCommand(Command{Args: bs}) + if err != nil { + t.Fatalf("%d: Error ExecuteCommand: %v", i, err) + } + if res.Err != nil { + t.Fatalf("%d: Result Error ExecuteCommand: %v", i, res.Err) + } + var retVal int + err = json.Unmarshal(res.Result, &retVal) + if err != nil { + t.Fatal(err) + } + // the test implementation of the fsm returns the count of events that have been received + if retVal != i+1 { + t.Fatalf("Result, want %d, got %d", i+1, retVal) + } + + fxEventsInAll := func() bool { + for _, pOther := range participants { + if !pOther.sm.eventsMatch(want) { + return false + } + } + return true + } + waitFor(t, "event makes it to all", fxEventsInAll, time.Second*1) + } +} + +func TestConfig(t *testing.T) { + testConfig(t) + ctx := context.Background() + clusterTag := "tag:whatever" + ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3) + cfg := warnLogConfig() + // test all is well with non default ports + cfg.CommandPort = 12347 + cfg.RaftPort = 11882 + mp := uint16(8798) + cfg.MonitorPort = mp + cfg.ServeDebugMonitor = true + createConsensusCluster(t, ctx, clusterTag, ps, cfg) + for _, p := range ps { + defer p.c.Stop(ctx) + } + assertCommandsWorkOnAnyNode(t, ps) + + url := fmt.Sprintf("http://%s:%d/", ps[0].c.self.hostAddr.String(), mp) + httpClientOnTailnet := ps[1].ts.HTTPClient() + rsp, err := httpClientOnTailnet.Get(url) + if err != nil { + t.Fatal(err) + } + if rsp.StatusCode != 200 { + t.Fatalf("monitor status want %d, got %d", 200, rsp.StatusCode) + } + defer rsp.Body.Close() + reader := bufio.NewReader(rsp.Body) + line1, err := reader.ReadString('\n') + if err != nil { + t.Fatal(err) + } + // Not a great assertion because it relies on the format of the response. + if !strings.HasPrefix(line1, "RaftState:") { + t.Fatalf("getting monitor status, first line, want something that starts with 'RaftState:', got '%s'", line1) + } +} + +func TestFollowerFailover(t *testing.T) { + testConfig(t) + ctx := context.Background() + clusterTag := "tag:whatever" + ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3) + cfg := warnLogConfig() + createConsensusCluster(t, ctx, clusterTag, ps, cfg) + for _, p := range ps { + defer p.c.Stop(ctx) + } + + smThree := ps[2].sm + + fut := ps[0].c.raft.Apply(commandWith(t, "a"), 2*time.Second) + futTwo := ps[0].c.raft.Apply(commandWith(t, "b"), 2*time.Second) + err := fut.Error() + if err != nil { + t.Fatalf("Apply Raft error %v", err) + } + err = futTwo.Error() + if err != nil { + t.Fatalf("Apply Raft error %v", err) + } + + wantFirstTwoEvents := []string{"a", "b"} + fxAllMachinesHaveTheApplies := func() bool { + return ps[0].sm.eventsMatch(wantFirstTwoEvents) && + ps[1].sm.eventsMatch(wantFirstTwoEvents) && + smThree.eventsMatch(wantFirstTwoEvents) + } + waitFor(t, "the apply events made it into all state machines", fxAllMachinesHaveTheApplies, time.Second*1) + + //a follower goes loses contact with the cluster + ps[2].c.Stop(ctx) + + // applies still make it to one and two + futThree := ps[0].c.raft.Apply(commandWith(t, "c"), 2*time.Second) + futFour := ps[0].c.raft.Apply(commandWith(t, "d"), 2*time.Second) + err = futThree.Error() + if err != nil { + t.Fatalf("Apply Raft error %v", err) + } + err = futFour.Error() + if err != nil { + t.Fatalf("Apply Raft error %v", err) + } + wantFourEvents := []string{"a", "b", "c", "d"} + fxAliveMachinesHaveTheApplies := func() bool { + return ps[0].sm.eventsMatch(wantFourEvents) && + ps[1].sm.eventsMatch(wantFourEvents) && + smThree.eventsMatch(wantFirstTwoEvents) + } + waitFor(t, "the apply events made it into eligible state machines", fxAliveMachinesHaveTheApplies, time.Second*1) + + // follower comes back + smThreeAgain := &fsm{} + cfg = addIDedLogger("2 after restarting", warnLogConfig()) + rThreeAgain, err := Start(ctx, ps[2].ts, smThreeAgain, clusterTag, cfg) + if err != nil { + t.Fatal(err) + } + defer rThreeAgain.Stop(ctx) + fxThreeGetsCaughtUp := func() bool { + return smThreeAgain.eventsMatch(wantFourEvents) + } + waitFor(t, "the apply events made it into the third node when it appeared with an empty state machine", fxThreeGetsCaughtUp, time.Second*2) + if !smThree.eventsMatch(wantFirstTwoEvents) { + t.Fatalf("Expected smThree to remain on 2 events: got %d", smThree.numEvents()) + } +} + +func TestRejoin(t *testing.T) { + testConfig(t) + ctx := context.Background() + clusterTag := "tag:whatever" + ps, control, controlURL := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3) + cfg := warnLogConfig() + createConsensusCluster(t, ctx, clusterTag, ps, cfg) + for _, p := range ps { + defer p.c.Stop(ctx) + } + + // 1st node gets a redundant second join request from the second node + ps[0].c.handleJoin(joinRequest{ + RemoteHost: ps[1].c.self.hostAddr.String(), + RemoteID: ps[1].c.self.id, + }) + + tsJoiner, keyJoiner, _ := startNode(t, ctx, controlURL, "node joiner") + tagNodes(t, control, []key.NodePublic{keyJoiner}, clusterTag) + waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{keyJoiner}, clusterTag) + smJoiner := &fsm{} + cJoiner, err := Start(ctx, tsJoiner, smJoiner, clusterTag, cfg) + if err != nil { + t.Fatal(err) + } + ps = append(ps, &participant{ + sm: smJoiner, + c: cJoiner, + ts: tsJoiner, + key: keyJoiner, + }) + + assertCommandsWorkOnAnyNode(t, ps) +} + +func TestOnlyTaggedPeersCanDialRaftPort(t *testing.T) { + testConfig(t) + ctx := context.Background() + clusterTag := "tag:whatever" + ps, control, controlURL := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3) + cfg := warnLogConfig() + createConsensusCluster(t, ctx, clusterTag, ps, cfg) + for _, p := range ps { + defer p.c.Stop(ctx) + } + assertCommandsWorkOnAnyNode(t, ps) + + untaggedNode, _, _ := startNode(t, ctx, controlURL, "untagged node") + + taggedNode, taggedKey, _ := startNode(t, ctx, controlURL, "untagged node") + tagNodes(t, control, []key.NodePublic{taggedKey}, clusterTag) + waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{taggedKey}, clusterTag) + + // surface area: command http, peer tcp + //untagged + ipv4, _ := ps[0].ts.TailscaleIPs() + sAddr := fmt.Sprintf("%s:%d", ipv4, cfg.RaftPort) + + getErrorFromTryingToSend := func(s *tsnet.Server) error { + ctx := context.Background() + conn, err := s.Dial(ctx, "tcp", sAddr) + if err != nil { + t.Fatalf("unexpected Dial err: %v", err) + } + fmt.Fprintf(conn, "hellllllloooooo") + status, err := bufio.NewReader(conn).ReadString('\n') + if status != "" { + t.Fatalf("node sending non-raft message should get empty response, got: '%s' for: %s", status, s.Hostname) + } + if err == nil { + t.Fatalf("node sending non-raft message should get an error but got nil err for: %s", s.Hostname) + } + return err + } + + isNetErr := func(err error) bool { + var netErr net.Error + return errors.As(err, &netErr) + } + + err := getErrorFromTryingToSend(untaggedNode) + if !isNetErr(err) { + t.Fatalf("untagged node trying to send should get a net.Error, got: %v", err) + } + // we still get an error trying to send but it's EOF the target node was happy to talk + // to us but couldn't understand what we said. + err = getErrorFromTryingToSend(taggedNode) + if isNetErr(err) { + t.Fatalf("tagged node trying to send should not get a net.Error, got: %v", err) + } +} + +func TestOnlyTaggedPeersCanBeDialed(t *testing.T) { + testConfig(t) + ctx := context.Background() + clusterTag := "tag:whatever" + ps, control, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3) + + // make a StreamLayer for ps[0] + ts := ps[0].ts + auth := newAuthorization(ts, clusterTag) + + port := 19841 + lns := make([]net.Listener, 3) + for i, p := range ps { + ln, err := p.ts.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + t.Fatal(err) + } + lns[i] = ln + } + + sl := StreamLayer{ + s: ts, + Listener: lns[0], + auth: auth, + shutdownCtx: ctx, + } + + ip1, _ := ps[1].ts.TailscaleIPs() + a1 := raft.ServerAddress(fmt.Sprintf("%s:%d", ip1, port)) + + ip2, _ := ps[2].ts.TailscaleIPs() + a2 := raft.ServerAddress(fmt.Sprintf("%s:%d", ip2, port)) + + // both can be dialed... + conn, err := sl.Dial(a1, 2*time.Second) + if err != nil { + t.Fatal(err) + } + conn.Close() + + conn, err = sl.Dial(a2, 2*time.Second) + if err != nil { + t.Fatal(err) + } + conn.Close() + + // untag ps[2] + tagNodes(t, control, []key.NodePublic{ps[2].key}, "") + waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{ps[2].key}, "") + + // now only ps[1] can be dialed + conn, err = sl.Dial(a1, 2*time.Second) + if err != nil { + t.Fatal(err) + } + conn.Close() + + _, err = sl.Dial(a2, 2*time.Second) + if err.Error() != "peer is not allowed" { + t.Fatalf("expected peer is not allowed, got: %v", err) + } + +} + +func TestOnlyTaggedPeersCanJoin(t *testing.T) { + testConfig(t) + ctx := context.Background() + clusterTag := "tag:whatever" + ps, _, controlURL := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3) + cfg := warnLogConfig() + createConsensusCluster(t, ctx, clusterTag, ps, cfg) + for _, p := range ps { + defer p.c.Stop(ctx) + } + + tsJoiner, _, _ := startNode(t, ctx, controlURL, "joiner node") + + ipv4, _ := tsJoiner.TailscaleIPs() + url := fmt.Sprintf("http://%s/join", ps[0].c.commandAddr(ps[0].c.self.hostAddr)) + payload, err := json.Marshal(joinRequest{ + RemoteHost: ipv4.String(), + RemoteID: "node joiner", + }) + if err != nil { + t.Fatal(err) + } + body := bytes.NewBuffer(payload) + req, err := http.NewRequest("POST", url, body) + if err != nil { + t.Fatal(err) + } + resp, err := tsJoiner.HTTPClient().Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusForbidden { + t.Fatalf("join req when not tagged, expected status: %d, got: %d", http.StatusForbidden, resp.StatusCode) + } + rBody, _ := io.ReadAll(resp.Body) + sBody := strings.TrimSpace(string(rBody)) + expected := "peer not allowed" + if sBody != expected { + t.Fatalf("join req when not tagged, expected body: %s, got: %s", expected, sBody) + } +}