add tracing and refactor some api pkgs

This commit is contained in:
Livio Amstutz
2020-03-24 14:15:01 +01:00
parent a5ca611dcc
commit 96b88f5d8c
26 changed files with 1198 additions and 12 deletions

View File

@@ -0,0 +1,38 @@
package middleware
import (
"context"
"strings"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/stats"
"github.com/caos/zitadel/internal/api"
"github.com/caos/zitadel/internal/tracing"
)
type GRPCMethod string
func TracingStatsClient(ignoredMethods ...GRPCMethod) grpc.DialOption {
return grpc.WithStatsHandler(&tracingClientHandler{ignoredMethods, ocgrpc.ClientHandler{StartOptions: trace.StartOptions{Sampler: tracing.Sampler(), SpanKind: trace.SpanKindClient}}})
}
func DefaultTracingStatsClient() grpc.DialOption {
return TracingStatsClient(api.Healthz, api.Readiness, api.Validation)
}
type tracingClientHandler struct {
IgnoredMethods []GRPCMethod
ocgrpc.ClientHandler
}
func (s *tracingClientHandler) TagRPC(ctx context.Context, tagInfo *stats.RPCTagInfo) context.Context {
for _, method := range s.IgnoredMethods {
if strings.HasSuffix(tagInfo.FullMethodName, string(method)) {
return ctx
}
}
return s.ClientHandler.TagRPC(ctx, tagInfo)
}

View File

@@ -0,0 +1,110 @@
package server
import (
"context"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
"github.com/caos/logging"
client_middleware "github.com/caos/zitadel/internal/api/grpc/client/middleware"
http_util "github.com/caos/zitadel/internal/api/http"
http_mw "github.com/caos/zitadel/internal/api/http/middleware"
)
const (
defaultGatewayPort = "8080"
mimeWildcard = "*/*"
)
var (
DefaultJSONMarshaler = &runtime.JSONPb{OrigName: false, EmitDefaults: false}
DefaultServeMuxOptions = []runtime.ServeMuxOption{
runtime.WithMarshalerOption(DefaultJSONMarshaler.ContentType(), DefaultJSONMarshaler),
runtime.WithMarshalerOption(mimeWildcard, DefaultJSONMarshaler),
runtime.WithMarshalerOption(runtime.MIMEWildcard, DefaultJSONMarshaler),
runtime.WithIncomingHeaderMatcher(runtime.DefaultHeaderMatcher),
runtime.WithOutgoingHeaderMatcher(runtime.DefaultHeaderMatcher),
}
)
type Gateway interface {
GRPCEndpoint() string
GatewayPort() string
Gateway() GatewayFunc
}
type GatewayFunc func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error
type gatewayCustomServeMuxOptions interface {
GatewayServeMuxOptions() []runtime.ServeMuxOption
}
type grpcGatewayCustomInterceptor interface {
GatewayHTTPInterceptor(http.Handler) http.Handler
}
type gatewayCustomCallOptions interface {
GatewayCallOptions() []grpc.DialOption
}
func StartGateway(ctx context.Context, g Gateway) {
mux := createMux(ctx, g)
serveGateway(ctx, mux, gatewayPort(g.GatewayPort()), g)
}
func createMux(ctx context.Context, g Gateway) *runtime.ServeMux {
muxOptions := DefaultServeMuxOptions
if customOpts, ok := g.(gatewayCustomServeMuxOptions); ok {
muxOptions = customOpts.GatewayServeMuxOptions()
}
mux := runtime.NewServeMux(muxOptions...)
opts := []grpc.DialOption{grpc.WithInsecure()}
opts = append(opts, client_middleware.DefaultTracingStatsClient())
if customOpts, ok := g.(gatewayCustomCallOptions); ok {
opts = append(opts, customOpts.GatewayCallOptions()...)
}
err := g.Gateway()(ctx, mux, g.GRPCEndpoint(), opts)
logging.Log("SERVE-7B7G0E").OnError(err).Panic("failed to create mux for grpc gateway")
return mux
}
func addInterceptors(handler http.Handler, g Gateway) http.Handler {
handler = http_mw.DefaultTraceHandler(handler)
if interceptor, ok := g.(grpcGatewayCustomInterceptor); ok {
handler = interceptor.GatewayHTTPInterceptor(handler)
}
return http_mw.CORSInterceptorOpts(http_mw.DefaultCORSOptions, handler)
}
func serveGateway(ctx context.Context, handler http.Handler, port string, g Gateway) {
server := &http.Server{
Handler: addInterceptors(handler, g),
}
listener := http_util.CreateListener(port)
go func() {
<-ctx.Done()
err := server.Shutdown(ctx)
logging.Log("SERVE-m7kBlq").OnError(err).Warn("error during graceful shutdown of grpc gateway")
}()
go func() {
err := server.Serve(listener)
logging.Log("SERVE-tBHR60").OnError(err).Panic("grpc gateway serve failed")
}()
logging.LogWithFields("SERVE-KHh0Cb", "port", port).Info("grpc gateway is listening")
}
func gatewayPort(port string) string {
if port == "" {
return defaultGatewayPort
}
return port
}

View File

@@ -1,4 +1,4 @@
package grpc
package middleware
import (
"context"
@@ -9,6 +9,7 @@ import (
"github.com/caos/zitadel/internal/api"
"github.com/caos/zitadel/internal/api/auth"
grpc_util "github.com/caos/zitadel/internal/api/grpc"
)
func AuthorizationInterceptor(verifier auth.TokenVerifier, authConfig *auth.Config, authMethods auth.MethodMapping) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
@@ -18,12 +19,12 @@ func AuthorizationInterceptor(verifier auth.TokenVerifier, authConfig *auth.Conf
return handler(ctx, req)
}
authToken := GetAuthorizationHeader(ctx)
authToken := grpc_util.GetAuthorizationHeader(ctx)
if authToken == "" {
return nil, status.Error(codes.Unauthenticated, "auth header missing")
}
orgID := GetHeader(ctx, api.ZitadelOrgID)
orgID := grpc_util.GetHeader(ctx, api.ZitadelOrgID)
ctx, err := auth.CheckUserAuthorization(ctx, req, authToken, orgID, verifier, authConfig, authOpt)
if err != nil {

View File

@@ -1,14 +1,16 @@
package grpc
package middleware
import (
"context"
"google.golang.org/grpc"
grpc_util "github.com/caos/zitadel/internal/api/grpc"
)
func ErrorHandler() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
return resp, CaosToGRPCError(err)
return resp, grpc_util.CaosToGRPCError(err)
}
}

View File

@@ -0,0 +1,33 @@
package middleware
import (
"context"
"strings"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/stats"
"github.com/caos/zitadel/internal/tracing"
)
type GRPCMethod string
func TracingStatsServer(ignoredMethods ...GRPCMethod) grpc.ServerOption {
return grpc.StatsHandler(&tracingServerHandler{ignoredMethods, ocgrpc.ServerHandler{StartOptions: trace.StartOptions{Sampler: tracing.Sampler(), SpanKind: trace.SpanKindServer}}})
}
type tracingServerHandler struct {
IgnoredMethods []GRPCMethod
ocgrpc.ServerHandler
}
func (s *tracingServerHandler) TagRPC(ctx context.Context, tagInfo *stats.RPCTagInfo) context.Context {
for _, method := range s.IgnoredMethods {
if strings.HasSuffix(tagInfo.FullMethodName, string(method)) {
return ctx
}
}
return s.ServerHandler.TagRPC(ctx, tagInfo)
}

View File

@@ -1,4 +1,4 @@
package grpc
package server
import (
"context"

View File

@@ -0,0 +1,53 @@
package server
import (
"context"
"net"
"github.com/caos/logging"
"google.golang.org/grpc"
"github.com/caos/zitadel/internal/api/http"
)
const (
defaultGrpcPort = "80"
)
type Server interface {
GRPCPort() string
GRPCServer() (*grpc.Server, error)
}
func StartServer(ctx context.Context, s Server) {
port := grpcPort(s.GRPCPort())
listener := http.CreateListener(port)
server := createGrpcServer(s)
serveServer(ctx, server, listener, port)
}
func createGrpcServer(s Server) *grpc.Server {
grpcServer, err := s.GRPCServer()
logging.Log("SERVE-k280HZ").OnError(err).Panic("failed to create grpc server")
return grpcServer
}
func serveServer(ctx context.Context, server *grpc.Server, listener net.Listener, port string) {
go func() {
<-ctx.Done()
server.GracefulStop()
}()
go func() {
err := server.Serve(listener)
logging.Log("SERVE-Ga3e94").OnError(err).Panic("grpc server serve failed")
}()
logging.LogWithFields("SERVE-bZ44QM", "port", port).Info("grpc server is listening")
}
func grpcPort(port string) string {
if port == "" {
return defaultGrpcPort
}
return port
}