mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 21:17:32 +00:00
feat: port reduction (#323)
* move mgmt pkg * begin package restructure * rename auth package to authz * begin start api * move auth * move admin * fix merge * configs and interceptors * interceptor * revert generate-grpc.sh * some cleanups * console * move console * fix tests and merging * js linting * merge * merging and configs * change k8s base to current ports * fixes * cleanup * regenerate proto * remove unnecessary whitespace * missing param * go mod tidy * fix merging * move login pkg * cleanup * move api pkgs again * fix pkg naming * fix generate-static.sh for login * update workflow * fixes * logging * remove duplicate * comment for optional gateway interfaces * regenerate protos * fix proto imports for grpc web * protos * grpc web generate * grpc web generate * fix changes * add translation interceptor * fix merging * regenerate mgmt proto
This commit is contained in:
@@ -3,12 +3,14 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-gateway/runtime"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/caos/logging"
|
||||
|
||||
grpc_util "github.com/caos/zitadel/internal/api/grpc"
|
||||
client_middleware "github.com/caos/zitadel/internal/api/grpc/client/middleware"
|
||||
http_util "github.com/caos/zitadel/internal/api/http"
|
||||
http_mw "github.com/caos/zitadel/internal/api/http/middleware"
|
||||
@@ -22,26 +24,41 @@ const (
|
||||
var (
|
||||
DefaultJSONMarshaler = &runtime.JSONPb{OrigName: false, EmitDefaults: false}
|
||||
|
||||
DefaultServeMuxOptions = []runtime.ServeMuxOption{
|
||||
runtime.WithMarshalerOption(DefaultJSONMarshaler.ContentType(), DefaultJSONMarshaler),
|
||||
runtime.WithMarshalerOption(mimeWildcard, DefaultJSONMarshaler),
|
||||
runtime.WithMarshalerOption(runtime.MIMEWildcard, DefaultJSONMarshaler),
|
||||
runtime.WithIncomingHeaderMatcher(runtime.DefaultHeaderMatcher),
|
||||
runtime.WithOutgoingHeaderMatcher(runtime.DefaultHeaderMatcher),
|
||||
DefaultServeMuxOptions = func(customHeaders ...string) []runtime.ServeMuxOption {
|
||||
return []runtime.ServeMuxOption{
|
||||
runtime.WithMarshalerOption(DefaultJSONMarshaler.ContentType(), DefaultJSONMarshaler),
|
||||
runtime.WithMarshalerOption(mimeWildcard, DefaultJSONMarshaler),
|
||||
runtime.WithMarshalerOption(runtime.MIMEWildcard, DefaultJSONMarshaler),
|
||||
runtime.WithIncomingHeaderMatcher(DefaultHeaderMatcher(customHeaders...)),
|
||||
runtime.WithOutgoingHeaderMatcher(runtime.DefaultHeaderMatcher),
|
||||
}
|
||||
}
|
||||
|
||||
DefaultHeaderMatcher = func(customHeaders ...string) runtime.HeaderMatcherFunc {
|
||||
return func(header string) (string, bool) {
|
||||
for _, customHeader := range customHeaders {
|
||||
if strings.HasPrefix(strings.ToLower(header), customHeader) {
|
||||
return header, true
|
||||
}
|
||||
}
|
||||
return runtime.DefaultHeaderMatcher(header)
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
type Gateway interface {
|
||||
GRPCEndpoint() string
|
||||
GatewayPort() string
|
||||
Gateway() GatewayFunc
|
||||
RegisterGateway() GatewayFunc
|
||||
GatewayPathPrefix() string
|
||||
}
|
||||
|
||||
type GatewayFunc func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error
|
||||
|
||||
//optional extending interfaces of Gateway below
|
||||
|
||||
type gatewayCustomServeMuxOptions interface {
|
||||
GatewayServeMuxOptions() []runtime.ServeMuxOption
|
||||
}
|
||||
|
||||
type grpcGatewayCustomInterceptor interface {
|
||||
GatewayHTTPInterceptor(http.Handler) http.Handler
|
||||
}
|
||||
@@ -50,28 +67,62 @@ type gatewayCustomCallOptions interface {
|
||||
GatewayCallOptions() []grpc.DialOption
|
||||
}
|
||||
|
||||
func StartGateway(ctx context.Context, g Gateway) {
|
||||
mux := createMux(ctx, g)
|
||||
serveGateway(ctx, mux, gatewayPort(g.GatewayPort()), g)
|
||||
type GatewayHandler struct {
|
||||
mux *http.ServeMux
|
||||
serverPort string
|
||||
gatewayPort string
|
||||
customHeaders []string
|
||||
}
|
||||
|
||||
func createMux(ctx context.Context, g Gateway) *runtime.ServeMux {
|
||||
muxOptions := DefaultServeMuxOptions
|
||||
func CreateGatewayHandler(config grpc_util.Config) *GatewayHandler {
|
||||
return &GatewayHandler{
|
||||
mux: http.NewServeMux(),
|
||||
serverPort: config.ServerPort,
|
||||
gatewayPort: config.GatewayPort,
|
||||
customHeaders: config.CustomHeaders,
|
||||
}
|
||||
}
|
||||
|
||||
//RegisterGateway registers a handler (Gateway interface) on defined port
|
||||
//Gateway interface may be extended with optional implementation of interfaces (gatewayCustomServeMuxOptions, ...)
|
||||
func (g *GatewayHandler) RegisterGateway(ctx context.Context, gateway Gateway) {
|
||||
handler := createGateway(ctx, gateway, g.serverPort, g.customHeaders...)
|
||||
prefix := gateway.GatewayPathPrefix()
|
||||
g.RegisterHandler(prefix, handler)
|
||||
}
|
||||
|
||||
func (g *GatewayHandler) RegisterHandler(prefix string, handler http.Handler) {
|
||||
http_util.RegisterHandler(g.mux, prefix, handler)
|
||||
}
|
||||
|
||||
func (g *GatewayHandler) Serve(ctx context.Context) {
|
||||
http_util.Serve(ctx, g.mux, g.gatewayPort, "api")
|
||||
}
|
||||
|
||||
func createGateway(ctx context.Context, g Gateway, port string, customHeaders ...string) http.Handler {
|
||||
mux := createMux(g, customHeaders...)
|
||||
opts := createDialOptions(g)
|
||||
err := g.RegisterGateway()(ctx, mux, http_util.Endpoint(port), opts)
|
||||
logging.Log("SERVE-7B7G0E").OnError(err).Panic("failed to register grpc gateway")
|
||||
return addInterceptors(mux, g)
|
||||
}
|
||||
|
||||
func createMux(g Gateway, customHeaders ...string) *runtime.ServeMux {
|
||||
muxOptions := DefaultServeMuxOptions(customHeaders...)
|
||||
if customOpts, ok := g.(gatewayCustomServeMuxOptions); ok {
|
||||
muxOptions = customOpts.GatewayServeMuxOptions()
|
||||
}
|
||||
mux := runtime.NewServeMux(muxOptions...)
|
||||
return runtime.NewServeMux(muxOptions...)
|
||||
}
|
||||
|
||||
func createDialOptions(g Gateway) []grpc.DialOption {
|
||||
opts := []grpc.DialOption{grpc.WithInsecure()}
|
||||
opts = append(opts, client_middleware.DefaultTracingStatsClient())
|
||||
|
||||
if customOpts, ok := g.(gatewayCustomCallOptions); ok {
|
||||
opts = append(opts, customOpts.GatewayCallOptions()...)
|
||||
}
|
||||
err := g.Gateway()(ctx, mux, g.GRPCEndpoint(), opts)
|
||||
logging.Log("SERVE-7B7G0E").OnError(err).Panic("failed to create mux for grpc gateway")
|
||||
|
||||
return mux
|
||||
return opts
|
||||
}
|
||||
|
||||
func addInterceptors(handler http.Handler, g Gateway) http.Handler {
|
||||
@@ -83,26 +134,6 @@ func addInterceptors(handler http.Handler, g Gateway) http.Handler {
|
||||
return http_mw.CORSInterceptorOpts(http_mw.DefaultCORSOptions, handler)
|
||||
}
|
||||
|
||||
func serveGateway(ctx context.Context, handler http.Handler, port string, g Gateway) {
|
||||
server := &http.Server{
|
||||
Handler: addInterceptors(handler, g),
|
||||
}
|
||||
|
||||
listener := http_util.CreateListener(port)
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
err := server.Shutdown(ctx)
|
||||
logging.Log("SERVE-m7kBlq").OnError(err).Warn("error during graceful shutdown of grpc gateway")
|
||||
}()
|
||||
|
||||
go func() {
|
||||
err := server.Serve(listener)
|
||||
logging.Log("SERVE-tBHR60").OnError(err).Panic("grpc gateway serve failed")
|
||||
}()
|
||||
logging.LogWithFields("SERVE-KHh0Cb", "port", port).Info("grpc gateway is listening")
|
||||
}
|
||||
|
||||
func gatewayPort(port string) string {
|
||||
if port == "" {
|
||||
return defaultGatewayPort
|
||||
|
Reference in New Issue
Block a user