readiness via socket

This commit is contained in:
Elio Bischof 2024-09-26 03:25:36 +02:00
parent 6893679f69
commit c5ee9464f1
No known key found for this signature in database
GPG Key ID: 7B383FDE4DDBF1BD
14 changed files with 308 additions and 57 deletions

View File

@ -11,14 +11,17 @@ RUN useradd -s "" --home / zitadel && \
chown zitadel /app/zitadel && \
chmod +x /app/zitadel && \
chown zitadel /app/entrypoint.sh && \
chmod +x /app/entrypoint.sh
chmod +x /app/entrypoint.sh && \
mkdir /emptytmp
WORKDIR /app
ENV PATH="/app:${PATH}"
HEALTHCHECK CMD /app/zitadel ready
USER zitadel
HEALTHCHECK --interval=5s --timeout=120s --retries=6 \
CMD ["/app/zitadel", "ready"]
ENTRYPOINT ["/app/entrypoint.sh"]
FROM --platform=$TARGETPLATFORM scratch as final
@ -26,9 +29,13 @@ ARG TARGETPLATFORM
COPY --from=artifact /etc/passwd /etc/passwd
COPY --from=artifact /etc/ssl/certs /etc/ssl/certs
COPY --from=artifact /app/zitadel /app/zitadel
HEALTHCHECK CMD /app/zitadel ready
COPY --from=artifact --chown=zitadel:zitadel /app/zitadel /app/zitadel
COPY --from=artifact --chown=zitadel:zitadel /emptytmp /tmp
USER zitadel
# /app/zitadel ready is a healthcheck endpoint that immediately
HEALTHCHECK --interval=5s --timeout=600s --retries=3 \
CMD ["/app/zitadel", "ready"]
ENTRYPOINT ["/app/zitadel"]

View File

@ -26,8 +26,8 @@ func New() *cobra.Command {
adminCMD.AddCommand(
initialise.New(),
setup.New(),
start.New(nil),
start.NewStartFromInit(nil),
start.New(),
start.NewStartFromInit(),
key.New(),
)

View File

@ -3,10 +3,10 @@ package initialise
import (
"context"
"embed"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/socket"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/dialect"
@ -72,9 +72,13 @@ func InitAll(ctx context.Context, config *Config) {
}
func initialise(config database.Config, steps ...func(*database.DB) error) error {
closeSocket, err := socket.ListenAndIgnore()
logging.OnError(err).Fatal("unable to listen on socket")
defer closeSocket()
logging.Info("initialization started")
err := ReadStmts(config.Type())
err = ReadStmts(config.Type())
if err != nil {
return err
}

View File

@ -2,14 +2,15 @@ package ready
import (
"crypto/tls"
"fmt"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/socket"
"net"
"net/http"
"os"
"strconv"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/zitadel/logging"
)
func New() *cobra.Command {
@ -27,6 +28,33 @@ func New() *cobra.Command {
}
func ready(config *Config) bool {
explicitErr := tryToCheckExplicitly(config)
if explicitErr == nil {
logging.Info("ready check passed")
return true
}
socketErr := expectTrueFromSocket(socket.ReadinessQuery)
if socketErr == nil {
logging.Info("ready check passed")
return true
}
logging.Warnf("ready check failed: %v", explicitErr)
logging.Warnf("ready check failed: %v", socketErr)
return false
}
func expectTrueFromSocket(query socket.SocketRequest) error {
resp, err := query.Request()
if err != nil {
return fmt.Errorf("socket request error: %w", err)
}
if resp != socket.True {
return fmt.Errorf("zitadel process did not respond true to a readiness query")
}
return nil
}
func tryToCheckExplicitly(config *Config) error {
scheme := "https"
if !config.TLS.Enabled {
scheme = "http"
@ -35,13 +63,11 @@ func ready(config *Config) bool {
httpClient := http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
res, err := httpClient.Get(scheme + "://" + net.JoinHostPort("localhost", strconv.Itoa(int(config.Port))) + "/debug/ready")
if err != nil {
logging.WithError(err).Warn("ready check failed")
return false
return fmt.Errorf("url error: %w", err)
}
defer res.Body.Close()
if res.StatusCode != 200 {
logging.WithFields("status", res.StatusCode).Warn("ready check failed")
return false
return fmt.Errorf("unexpected status code: %d", res.StatusCode)
}
return true
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"embed"
_ "embed"
"github.com/zitadel/zitadel/internal/socket"
"net/http"
"github.com/spf13/cobra"
@ -53,7 +54,11 @@ func New() *cobra.Command {
Requirements:
- cockroachdb`,
Run: func(cmd *cobra.Command, args []string) {
err := tls.ModeFromFlag(cmd)
closeSocket, err := socket.ListenAndIgnore()
logging.OnError(err).Fatal("unable to listen on socket")
defer closeSocket()
err = tls.ModeFromFlag(cmd)
logging.OnError(err).Fatal("invalid tlsMode")
err = BindInitProjections(cmd)

View File

@ -5,6 +5,7 @@ import (
"crypto/tls"
_ "embed"
"fmt"
"github.com/zitadel/zitadel/internal/socket"
"math"
"net/http"
"os"
@ -85,7 +86,7 @@ import (
"github.com/zitadel/zitadel/internal/logstore/emitters/execution"
"github.com/zitadel/zitadel/internal/logstore/emitters/stdout"
"github.com/zitadel/zitadel/internal/logstore/record"
"github.com/zitadel/zitadel/internal/net"
znet "github.com/zitadel/zitadel/internal/net"
"github.com/zitadel/zitadel/internal/notification"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/static"
@ -95,7 +96,7 @@ import (
"github.com/zitadel/zitadel/openapi"
)
func New(server chan<- *Server) *cobra.Command {
func New() *cobra.Command {
start := &cobra.Command{
Use: "start",
Short: "starts ZITADEL instance",
@ -103,7 +104,12 @@ func New(server chan<- *Server) *cobra.Command {
Requirements:
- cockroachdb`,
RunE: func(cmd *cobra.Command, args []string) error {
err := cmd_tls.ModeFromFlag(cmd)
startup, closeSocket, err := listenSocket(cmd.Context())
if err != nil {
return err
}
defer closeSocket()
err = cmd_tls.ModeFromFlag(cmd)
if err != nil {
return err
}
@ -112,7 +118,7 @@ Requirements:
if err != nil {
return err
}
return startZitadel(cmd.Context(), config, masterKey, server)
return startZitadel(cmd.Context(), config, masterKey, startup)
},
}
@ -136,6 +142,8 @@ type Server struct {
Shutdown chan<- os.Signal
}
const socketPath = "/tmp/zitadel.sock"
func startZitadel(ctx context.Context, config *Config, masterKey string, server chan<- *Server) error {
showBasicInformation(config)
@ -288,7 +296,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server
if err != nil {
return err
}
api, err := startAPIs(
apis, err := startAPIs(
ctx,
clock,
router,
@ -305,13 +313,12 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server
if err != nil {
return err
}
commands.GrpcMethodExisting = checkExisting(api.ListGrpcMethods())
commands.GrpcServiceExisting = checkExisting(api.ListGrpcServices())
commands.GrpcMethodExisting = checkExisting(apis.ListGrpcMethods())
commands.GrpcServiceExisting = checkExisting(apis.ListGrpcServices())
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
if server != nil {
go func() {
server <- &Server{
Config: config,
DB: queryDBClient,
@ -327,8 +334,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server
Shutdown: shutdown,
}
close(server)
}
}()
return listen(ctx, router, config.Port, tlsConfig, shutdown)
}
@ -544,14 +550,12 @@ func listen(ctx context.Context, router *mux.Router, port uint16, tlsConfig *tls
http2Server := &http2.Server{}
http1Server := &http.Server{Handler: h2c.NewHandler(router, http2Server), TLSConfig: tlsConfig}
lc := net.ListenConfig()
lc := znet.ListenConfig()
lis, err := lc.Listen(ctx, "tcp", fmt.Sprintf(":%d", port))
if err != nil {
return fmt.Errorf("tcp listener on %d failed: %w", port, err)
}
errCh := make(chan error)
go func() {
logging.Infof("server is listening on %s", lis.Addr().String())
if tlsConfig != nil {
@ -561,7 +565,6 @@ func listen(ctx context.Context, router *mux.Router, port uint16, tlsConfig *tls
errCh <- http1Server.Serve(lis)
}
}()
select {
case err := <-errCh:
return fmt.Errorf("error starting server: %w", err)
@ -616,3 +619,18 @@ func checkExisting(values []string) func(string) bool {
return slices.Contains(values, value)
}
}
func listenSocket(ctx context.Context) (chan<- *Server, func() error, error) {
return socket.Listen(func(server *Server, request socket.SocketRequest) (socket.SocketResponse, error) {
switch request {
case socket.ReadinessQuery:
if readyErr := server.Queries.Health(ctx); readyErr != nil {
logging.Warnf("readiness check failed: %v", readyErr)
return socket.False, nil
}
return socket.True, nil
default:
return socket.UnknownRequest, fmt.Errorf("unknown request: %d", request)
}
})
}

View File

@ -11,7 +11,7 @@ import (
"github.com/zitadel/zitadel/cmd/tls"
)
func NewStartFromInit(server chan<- *Server) *cobra.Command {
func NewStartFromInit() *cobra.Command {
cmd := &cobra.Command{
Use: "start-from-init",
Short: "cold starts zitadel",
@ -23,7 +23,11 @@ Last ZITADEL starts.
Requirements:
- cockroachdb`,
Run: func(cmd *cobra.Command, args []string) {
err := tls.ModeFromFlag(cmd)
startup, closeSocket, err := listenSocket(cmd.Context())
logging.OnError(err).Fatal("unable to listen on socket")
defer closeSocket()
err = tls.ModeFromFlag(cmd)
logging.OnError(err).Fatal("invalid tlsMode")
masterKey, err := key.MasterKey(cmd)
@ -40,7 +44,7 @@ Requirements:
startConfig := MustNewConfig(viper.GetViper())
err = startZitadel(cmd.Context(), startConfig, masterKey, server)
err = startZitadel(cmd.Context(), startConfig, masterKey, startup)
logging.OnError(err).Fatal("unable to start zitadel")
},
}

View File

@ -10,7 +10,7 @@ import (
"github.com/zitadel/zitadel/cmd/tls"
)
func NewStartFromSetup(server chan<- *Server) *cobra.Command {
func NewStartFromSetup() *cobra.Command {
cmd := &cobra.Command{
Use: "start-from-setup",
Short: "cold starts zitadel",
@ -23,7 +23,11 @@ Requirements:
- database is initialized
`,
Run: func(cmd *cobra.Command, args []string) {
err := tls.ModeFromFlag(cmd)
startup, closeSocket, err := listenSocket(cmd.Context())
logging.OnError(err).Fatal("unable to listen on socket")
defer closeSocket()
err = tls.ModeFromFlag(cmd)
logging.OnError(err).Fatal("invalid tlsMode")
masterKey, err := key.MasterKey(cmd)
@ -38,7 +42,7 @@ Requirements:
startConfig := MustNewConfig(viper.GetViper())
err = startZitadel(cmd.Context(), startConfig, masterKey, server)
err = startZitadel(cmd.Context(), startConfig, masterKey, startup)
logging.OnError(err).Fatal("unable to start zitadel")
},
}

View File

@ -28,7 +28,7 @@ var (
defaultConfig []byte
)
func New(out io.Writer, in io.Reader, args []string, server chan<- *start.Server) *cobra.Command {
func New(out io.Writer, in io.Reader, args []string) *cobra.Command {
cmd := &cobra.Command{
Use: "zitadel",
Short: "The ZITADEL CLI lets you interact with ZITADEL",
@ -53,9 +53,9 @@ func New(out io.Writer, in io.Reader, args []string, server chan<- *start.Server
admin.New(), //is now deprecated, remove later on
initialise.New(),
setup.New(),
start.New(server),
start.NewStartFromInit(server),
start.NewStartFromSetup(server),
start.New(),
start.NewStartFromInit(),
start.NewStartFromSetup(),
mirror.New(&configFiles),
key.New(),
ready.New(),

44
docker-compose.yaml Normal file
View File

@ -0,0 +1,44 @@
version: '3.8'
services:
zitadel:
restart: 'always'
networks:
- 'zitadel'
image: zitadel:local
command: 'start-from-init --masterkey "MasterkeyNeedsToHave32Characters" --tlsMode disabled'
environment:
- 'ZITADEL_DATABASE_POSTGRES_HOST=db'
- 'ZITADEL_DATABASE_POSTGRES_PORT=5432'
- 'ZITADEL_DATABASE_POSTGRES_DATABASE=zitadel'
- 'ZITADEL_DATABASE_POSTGRES_USER_USERNAME=zitadel'
- 'ZITADEL_DATABASE_POSTGRES_USER_PASSWORD=zitadel'
- 'ZITADEL_DATABASE_POSTGRES_USER_SSL_MODE=disable'
- 'ZITADEL_DATABASE_POSTGRES_ADMIN_USERNAME=postgres'
- 'ZITADEL_DATABASE_POSTGRES_ADMIN_PASSWORD=postgres'
- 'ZITADEL_DATABASE_POSTGRES_ADMIN_SSL_MODE=disable'
- 'ZITADEL_EXTERNALSECURE=false'
depends_on:
db:
condition: 'service_healthy'
ports:
- '8080:8080'
db:
restart: 'always'
image: postgres:16-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=zitadel
networks:
- 'zitadel'
healthcheck:
test: ["CMD-SHELL", "pg_isready", "-d", "zitadel", "-U", "postgres"]
interval: '10s'
timeout: '30s'
retries: 5
start_period: '20s'
networks:
zitadel:

View File

@ -3,6 +3,7 @@ package api
import (
"context"
"crypto/tls"
"errors"
"net/http"
"sort"
"strings"
@ -37,6 +38,7 @@ type API struct {
healthServer *health.Server
accessInterceptor *http_mw.AccessInterceptor
queries *query.Queries
acceptingTraffic bool
}
func (a *API) ListGrpcServices() []string {
@ -218,21 +220,29 @@ func (a *API) routeGRPCWeb() {
Name("grpc-web")
}
func (a *API) healthHandler() http.Handler {
checks := []ValidationFunction{
func(ctx context.Context) error {
if err := a.health.Health(ctx); err != nil {
return zerrors.ThrowInternal(err, "API-F24h2", "DB CONNECTION ERROR")
}
return nil
},
func (a *API) IsReady(ctx context.Context) error {
return errors.Join(validate(ctx, a.readinessChecks())...)
}
func (a *API) checkDBPing(ctx context.Context) error {
if err := a.health.Health(ctx); err != nil {
return zerrors.ThrowInternal(err, "API-F24h2", "DB CONNECTION ERROR")
}
return nil
}
func (a *API) readinessChecks() []ValidationFunction {
return []ValidationFunction{
a.checkDBPing,
}
}
func (a *API) healthHandler() http.Handler {
handler := http.NewServeMux()
handler.HandleFunc("/healthz", handleHealth)
handler.HandleFunc("/ready", handleReadiness(checks))
handler.HandleFunc("/validate", handleValidate(checks))
handler.HandleFunc("/ready", handleReadiness(a.readinessChecks()))
handler.HandleFunc("/validate", handleValidate(a.readinessChecks()))
handler.Handle("/metrics", metricsExporter())
return handler
}

60
internal/socket/listen.go Normal file
View File

@ -0,0 +1,60 @@
package socket
import (
"fmt"
"net"
"os"
"github.com/zitadel/logging"
)
func Listen[T any](handleFunc HandleFunc[T]) (chan<- T, func() error, error) {
listener, err := listen()
if err != nil {
return nil, func() error { return nil }, fmt.Errorf("cannot start socket listener: %w", err)
}
serverStartedUp := make(chan T)
go acceptSocketConnections[T](listener, serverStartedUp, handleFunc)
return serverStartedUp, listener.Close, nil
}
func ListenAndIgnore() (func() error, error) {
listener, err := listen()
if err != nil {
return func() error { return nil }, fmt.Errorf("cannot start socket listener: %w", err)
}
go acceptSocketConnections[any](listener, make(chan any), nil)
return listener.Close, nil
}
func listen() (net.Listener, error) {
if err := os.Remove(Path); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("cannot remove socket file: %w", err)
}
return net.Listen("unix", Path)
}
type Handler func(request SocketRequest) (SocketResponse, error)
func acceptSocketConnections[T any](listener net.Listener, startupDone <-chan T, handle HandleFunc[T]) {
var server T
for {
conn, err := listener.Accept()
if err != nil {
logging.Errorf("accept socket error: %v", err)
continue
}
if server == nil {
server = <-startupDone
}
if handle == nil {
conn.Close()
return
}
go func() {
if handleErr := respond(conn, handle, server); handleErr != nil {
logging.Errorf("socket handle error: %v", handleErr)
}
}()
}
}

69
internal/socket/socket.go Normal file
View File

@ -0,0 +1,69 @@
package socket
import (
"fmt"
"io"
"net"
)
const Path = "/tmp/zitadel.sock"
type SocketRequest byte
const (
unknown SocketRequest = iota
ReadinessQuery
)
type SocketResponse byte
const (
unknownResponse SocketResponse = iota
UnknownRequest
True
False
)
func (s SocketRequest) Request() (resp SocketResponse, err error) {
conn, err := net.Dial("unix", Path)
if err != nil {
return resp, fmt.Errorf("dial error: %w", err)
}
defer conn.Close()
_, err = conn.Write([]byte{byte(s)})
if err != nil {
return resp, fmt.Errorf("write error: %w", err)
}
response, err := io.ReadAll(conn)
if err != nil {
return resp, fmt.Errorf("read error: %w", err)
}
if len(response) != 1 {
return resp, fmt.Errorf("invalid response length")
}
return SocketResponse(response[0]), nil
}
type HandleFunc[T any] func(T, SocketRequest) (SocketResponse, error)
func respond[T any](conn net.Conn, handler HandleFunc[T], server T) error {
defer conn.Close()
buf := make([]byte, 1)
_, err := conn.Read(buf)
if err != nil {
return fmt.Errorf("could not read from socket: %v", err)
}
if len(buf) != 1 {
return fmt.Errorf("invalid request length: %d", len(buf))
}
req := SocketRequest(buf[0])
resp, err := handler(server, req)
if err != nil {
return fmt.Errorf("handler error: %w", err)
}
_, err = conn.Write([]byte{byte(resp)})
if err != nil {
return fmt.Errorf("could not write response: %v", err)
}
return nil
}

View File

@ -10,6 +10,6 @@ import (
func main() {
args := os.Args[1:]
rootCmd := cmd.New(os.Stdout, os.Stdin, args, nil)
rootCmd := cmd.New(os.Stdout, os.Stdin, args)
cobra.CheckErr(rootCmd.Execute())
}