Merge remote-tracking branch 'origin/master' into new-eventstore

This commit is contained in:
adlerhurst
2020-11-23 11:40:50 +01:00
61 changed files with 449 additions and 982 deletions

View File

@@ -14,6 +14,7 @@ import (
client_middleware "github.com/caos/zitadel/internal/api/grpc/client/middleware"
http_util "github.com/caos/zitadel/internal/api/http"
http_mw "github.com/caos/zitadel/internal/api/http/middleware"
"github.com/caos/zitadel/internal/tracing"
)
const (
@@ -103,7 +104,7 @@ func createGateway(ctx context.Context, g Gateway, port string, customHeaders ..
mux := createMux(g, customHeaders...)
opts := createDialOptions(g)
err := g.RegisterGateway()(ctx, mux, http_util.Endpoint(port), opts)
logging.Log("SERVE-7B7G0E").OnError(err).Panic("failed to register grpc gateway")
logging.Log("SERVE-7B7G0E").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Panic("failed to register grpc gateway")
return addInterceptors(mux, g)
}
@@ -116,8 +117,10 @@ func createMux(g Gateway, customHeaders ...string) *runtime.ServeMux {
}
func createDialOptions(g Gateway) []grpc.DialOption {
opts := []grpc.DialOption{grpc.WithInsecure()}
opts = append(opts, client_middleware.DefaultTracingStatsClient())
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(client_middleware.DefaultTracingClient()),
}
if customOpts, ok := g.(gatewayCustomCallOptions); ok {
opts = append(opts, customOpts.GatewayCallOptions()...)

View File

@@ -20,15 +20,14 @@ func AuthorizationInterceptor(verifier *authz.TokenVerifier, authConfig authz.Co
}
func authorize(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, verifier *authz.TokenVerifier, authConfig authz.Config) (_ interface{}, err error) {
ctx, span := tracing.NewServerInterceptorSpan(ctx)
defer func() { span.EndWithError(err) }()
authOpt, needsToken := verifier.CheckAuthMethod(info.FullMethod)
if !needsToken {
span.End()
return handler(ctx, req)
}
ctx, span := tracing.NewServerInterceptorSpan(ctx)
defer func() { span.EndWithError(err) }()
authToken := grpc_util.GetAuthorizationHeader(ctx)
if authToken == "" {
return nil, status.Error(codes.Unauthenticated, "auth header missing")

View File

@@ -4,45 +4,30 @@ import (
"context"
"strings"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/stats"
grpc_utils "github.com/caos/zitadel/internal/api/grpc"
"github.com/caos/zitadel/internal/tracing"
grpc_trace "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)
type GRPCMethod string
func TracingStatsServer(ignoredMethods ...GRPCMethod) grpc.ServerOption {
return grpc.StatsHandler(
&tracingServerHandler{
ignoredMethods,
ocgrpc.ServerHandler{
StartOptions: trace.StartOptions{
Sampler: tracing.Sampler(),
SpanKind: trace.SpanKindServer,
},
},
},
)
func DefaultTracingServer() grpc.UnaryServerInterceptor {
return TracingServer(grpc_utils.Healthz, grpc_utils.Readiness, grpc_utils.Validation)
}
func DefaultTracingStatsServer() grpc.ServerOption {
return TracingStatsServer(grpc_utils.Healthz, grpc_utils.Readiness, grpc_utils.Validation)
}
func TracingServer(ignoredMethods ...GRPCMethod) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
type tracingServerHandler struct {
IgnoredMethods []GRPCMethod
ocgrpc.ServerHandler
}
func (s *tracingServerHandler) TagRPC(ctx context.Context, tagInfo *stats.RPCTagInfo) context.Context {
for _, method := range s.IgnoredMethods {
if strings.HasSuffix(tagInfo.FullMethodName, string(method)) {
return ctx
for _, ignoredMethod := range ignoredMethods {
if strings.HasSuffix(info.FullMethod, string(ignoredMethod)) {
return handler(ctx, req)
}
}
return grpc_trace.UnaryServerInterceptor()(ctx, req, info, handler)
}
return s.ServerHandler.TagRPC(ctx, tagInfo)
}

View File

@@ -1,71 +0,0 @@
package middleware
import (
"context"
"testing"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc/stats"
)
func Test_tracingServerHandler_TagRPC(t *testing.T) {
type fields struct {
IgnoredMethods []GRPCMethod
ServerHandler ocgrpc.ServerHandler
}
type args struct {
ctx context.Context
tagInfo *stats.RPCTagInfo
}
type res struct {
wantSpan bool
}
tests := []struct {
name string
fields fields
args args
res res
}{
{
"ignored method",
fields{
IgnoredMethods: []GRPCMethod{"ignore"},
ServerHandler: ocgrpc.ServerHandler{},
},
args{
ctx: context.Background(),
tagInfo: &stats.RPCTagInfo{
FullMethodName: "ignore",
},
},
res{false},
},
{
"tag",
fields{
IgnoredMethods: []GRPCMethod{"ignore"},
ServerHandler: ocgrpc.ServerHandler{},
},
args{
ctx: context.Background(),
tagInfo: &stats.RPCTagInfo{
FullMethodName: "tag",
},
},
res{true},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &tracingServerHandler{
IgnoredMethods: tt.fields.IgnoredMethods,
ServerHandler: tt.fields.ServerHandler,
}
got := s.TagRPC(tt.args.ctx, tt.args.tagInfo)
if (trace.FromContext(got) != nil) != tt.res.wantSpan {
t.Errorf("TagRPC() = %v, want %v", got, tt.res.wantSpan)
}
})
}
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/proto"
"github.com/caos/zitadel/internal/tracing"
)
type ValidationFunction func(ctx context.Context) error
@@ -41,7 +42,7 @@ func validate(ctx context.Context, validations map[string]ValidationFunction) ma
errors := make(map[string]error)
for id, validation := range validations {
if err := validation(ctx); err != nil {
logging.Log("API-vf823").WithError(err).Error("validation failed")
logging.Log("API-vf823").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Error("validation failed")
errors[id] = err
}
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/api/grpc/server/middleware"
"github.com/caos/zitadel/internal/api/http"
"github.com/caos/zitadel/internal/tracing"
)
const (
@@ -27,9 +28,9 @@ type Server interface {
func CreateServer(verifier *authz.TokenVerifier, authConfig authz.Config, lang language.Tag) *grpc.Server {
return grpc.NewServer(
middleware.DefaultTracingStatsServer(),
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
middleware.DefaultTracingServer(),
middleware.ErrorHandler(),
middleware.AuthorizationInterceptor(verifier, authConfig),
middleware.TranslationHandler(lang),
@@ -49,9 +50,9 @@ func Serve(ctx context.Context, server *grpc.Server, port string) {
go func() {
listener := http.CreateListener(port)
err := server.Serve(listener)
logging.Log("SERVE-Ga3e94").OnError(err).Panic("grpc server serve failed")
logging.Log("SERVE-Ga3e94").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Panic("grpc server serve failed")
}()
logging.LogWithFields("SERVE-bZ44QM", "port", port).Info("grpc server is listening")
logging.LogWithFields("SERVE-bZ44QM", "port", port).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Info("grpc server is listening")
}
func grpcPort(port string) string {