fix(tracing): from opencensus to opentelemetry (#937)

* refactor: switch from opencensus to opentelemetry

* tempo works as designed nooooot

* fix: log traceids

* with grafana agent

* fix: http tracing

* fix: cleanup files

* chore: remove todo

* fix: bad test

* fix: ignore methods in grpc interceptors

* fix: remove test log

* clean up

* typo

* fix(config): configure tracing endpoint

* fix(span): add error id to span
This commit is contained in:
Silvan
2020-11-20 07:57:39 +01:00
committed by GitHub
parent fcf81bed5f
commit 168242e725
47 changed files with 412 additions and 796 deletions

View File

@@ -10,13 +10,13 @@ import (
"github.com/caos/zitadel/internal/config/systemdefaults"
es_models "github.com/caos/zitadel/internal/eventstore/models"
es_sdk "github.com/caos/zitadel/internal/eventstore/sdk"
iam_es_model "github.com/caos/zitadel/internal/iam/repository/view/model"
org_es "github.com/caos/zitadel/internal/org/repository/eventsourcing"
usr_model "github.com/caos/zitadel/internal/user/model"
usr_es "github.com/caos/zitadel/internal/user/repository/eventsourcing"
iam_model "github.com/caos/zitadel/internal/iam/model"
iam_es "github.com/caos/zitadel/internal/iam/repository/eventsourcing"
iam_es_model "github.com/caos/zitadel/internal/iam/repository/view/model"
org_es "github.com/caos/zitadel/internal/org/repository/eventsourcing"
"github.com/caos/zitadel/internal/tracing"
usr_model "github.com/caos/zitadel/internal/user/model"
usr_es "github.com/caos/zitadel/internal/user/repository/eventsourcing"
)
type IAMRepository struct {
@@ -55,7 +55,7 @@ func (repo *IAMRepository) RemoveIAMMember(ctx context.Context, userID string) e
func (repo *IAMRepository) SearchIAMMembers(ctx context.Context, request *iam_model.IAMMemberSearchRequest) (*iam_model.IAMMemberSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestIAMMemberSequence()
logging.Log("EVENT-Slkci").OnError(err).Warn("could not read latest iam sequence")
logging.Log("EVENT-Slkci").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest iam sequence")
members, count, err := repo.View.SearchIAMMembers(request)
if err != nil {
return nil, err
@@ -157,7 +157,7 @@ func (repo *IAMRepository) ChangeOidcIDPConfig(ctx context.Context, oidcConfig *
func (repo *IAMRepository) SearchIDPConfigs(ctx context.Context, request *iam_model.IDPConfigSearchRequest) (*iam_model.IDPConfigSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestIDPConfigSequence()
logging.Log("EVENT-Dk8si").OnError(err).Warn("could not read latest idp config sequence")
logging.Log("EVENT-Dk8si").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest idp config sequence")
idps, count, err := repo.View.SearchIDPConfigs(request)
if err != nil {
return nil, err
@@ -249,7 +249,7 @@ func (repo *IAMRepository) SearchDefaultIDPProviders(ctx context.Context, reques
request.EnsureLimit(repo.SearchLimit)
request.AppendAggregateIDQuery(repo.SystemDefaults.IamID)
sequence, err := repo.View.GetLatestIDPProviderSequence()
logging.Log("EVENT-Tuiks").OnError(err).Warn("could not read latest iam sequence")
logging.Log("EVENT-Tuiks").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest iam sequence")
providers, count, err := repo.View.SearchIDPProviders(request)
if err != nil {
return nil, err

View File

@@ -2,9 +2,11 @@ package eventstore
import (
"context"
"github.com/caos/zitadel/internal/errors"
iam_model "github.com/caos/zitadel/internal/iam/model"
iam_view "github.com/caos/zitadel/internal/iam/repository/view/model"
"github.com/caos/zitadel/internal/tracing"
"github.com/caos/logging"
admin_model "github.com/caos/zitadel/internal/admin/model"
@@ -86,7 +88,7 @@ func (repo *OrgRepo) OrgByID(ctx context.Context, id string) (*org_model.Org, er
func (repo *OrgRepo) SearchOrgs(ctx context.Context, query *org_model.OrgSearchRequest) (*org_model.OrgSearchResult, error) {
query.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestOrgSequence()
logging.Log("EVENT-LXo9w").OnError(err).Warn("could not read latest iam sequence")
logging.Log("EVENT-LXo9w").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest iam sequence")
orgs, count, err := repo.View.SearchOrgs(query)
if err != nil {
return nil, err

View File

@@ -16,6 +16,7 @@ import (
"github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/errors"
iam_model "github.com/caos/zitadel/internal/iam/model"
"github.com/caos/zitadel/internal/tracing"
)
type Config struct {
@@ -96,7 +97,7 @@ func (a *API) healthHandler() http.Handler {
func handleHealth(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("ok"))
logging.Log("API-Hfss2").OnError(err).Error("error writing ok for health")
logging.Log("API-Hfss2").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(r.Context())).Error("error writing ok for health")
}
func handleReadiness(checks []ValidationFunction) func(w http.ResponseWriter, r *http.Request) {
@@ -124,7 +125,7 @@ type ValidationFunction func(ctx context.Context) error
func validate(ctx context.Context, validations []ValidationFunction) error {
for _, validation := range validations {
if err := validation(ctx); err != nil {
logging.Log("API-vf823").WithError(err).Error("validation failed")
logging.Log("API-vf823").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Error("validation failed")
return err
}
}

View File

@@ -3,6 +3,7 @@ package auth
import (
"context"
"encoding/json"
"github.com/caos/logging"
"github.com/golang/protobuf/ptypes"
"golang.org/x/text/language"
@@ -11,6 +12,7 @@ import (
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/tracing"
usr_model "github.com/caos/zitadel/internal/user/model"
"github.com/caos/zitadel/pkg/grpc/auth"
"github.com/caos/zitadel/pkg/grpc/message"
@@ -96,7 +98,7 @@ func profileViewFromModel(profile *usr_model.Profile) *auth.UserProfileView {
func updateProfileToModel(ctx context.Context, u *auth.UpdateUserProfileRequest) *usr_model.Profile {
preferredLanguage, err := language.Parse(u.PreferredLanguage)
logging.Log("GRPC-lk73L").OnError(err).Debug("language malformed")
logging.Log("GRPC-lk73L").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("language malformed")
return &usr_model.Profile{
ObjectRoot: models.ObjectRoot{AggregateID: authz.GetCtxData(ctx).UserID},

View File

@@ -1,68 +0,0 @@
package middleware
import (
"context"
"testing"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc/stats"
)
func Test_tracingClientHandler_TagRPC(t *testing.T) {
type fields struct {
IgnoredMethods []GRPCMethod
ClientHandler ocgrpc.ClientHandler
}
type args struct {
ctx context.Context
tagInfo *stats.RPCTagInfo
}
tests := []struct {
name string
fields fields
args args
wantSpan bool
}{
{
"ignored method",
fields{
IgnoredMethods: []GRPCMethod{"ignore"},
ClientHandler: ocgrpc.ClientHandler{},
},
args{
ctx: context.Background(),
tagInfo: &stats.RPCTagInfo{
FullMethodName: "ignore",
},
},
false,
},
{
"tag",
fields{
IgnoredMethods: []GRPCMethod{"ignore"},
ClientHandler: ocgrpc.ClientHandler{},
},
args{
ctx: context.Background(),
tagInfo: &stats.RPCTagInfo{
FullMethodName: "tag",
},
},
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &tracingClientHandler{
IgnoredMethods: tt.fields.IgnoredMethods,
ClientHandler: tt.fields.ClientHandler,
}
got := s.TagRPC(tt.args.ctx, tt.args.tagInfo)
if (trace.FromContext(got) != nil) != tt.wantSpan {
t.Errorf("TagRPC() = %v, want %v", got, tt.wantSpan)
}
})
}
}

View File

@@ -4,44 +4,32 @@ import (
"context"
"strings"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/stats"
grpc_utils "github.com/caos/zitadel/internal/api/grpc"
"github.com/caos/zitadel/internal/tracing"
grpc_trace "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)
type GRPCMethod string
func TracingStatsClient(ignoredMethods ...GRPCMethod) grpc.DialOption {
return grpc.WithStatsHandler(
&tracingClientHandler{
ignoredMethods,
ocgrpc.ClientHandler{
StartOptions: trace.StartOptions{
Sampler: tracing.Sampler(),
SpanKind: trace.SpanKindClient},
},
},
)
func DefaultTracingClient() grpc.UnaryClientInterceptor {
return TracingServer(grpc_utils.Healthz, grpc_utils.Readiness, grpc_utils.Validation)
}
func DefaultTracingStatsClient() grpc.DialOption {
return TracingStatsClient(grpc_utils.Healthz, grpc_utils.Readiness, grpc_utils.Validation)
}
func TracingServer(ignoredMethods ...GRPCMethod) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
type tracingClientHandler struct {
IgnoredMethods []GRPCMethod
ocgrpc.ClientHandler
}
func (s *tracingClientHandler) TagRPC(ctx context.Context, tagInfo *stats.RPCTagInfo) context.Context {
for _, method := range s.IgnoredMethods {
if strings.HasSuffix(tagInfo.FullMethodName, string(method)) {
return ctx
for _, ignoredMethod := range ignoredMethods {
if strings.HasSuffix(method, string(ignoredMethod)) {
return invoker(ctx, method, req, reply, cc, opts...)
}
}
return grpc_trace.UnaryClientInterceptor()(ctx, method, req, reply, cc, invoker, opts...)
}
return s.ClientHandler.TagRPC(ctx, tagInfo)
}

View File

@@ -2,6 +2,7 @@ package errors
import (
"context"
"github.com/caos/logging"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/pkg/grpc/message"

View File

@@ -14,6 +14,7 @@ import (
client_middleware "github.com/caos/zitadel/internal/api/grpc/client/middleware"
http_util "github.com/caos/zitadel/internal/api/http"
http_mw "github.com/caos/zitadel/internal/api/http/middleware"
"github.com/caos/zitadel/internal/tracing"
)
const (
@@ -103,7 +104,7 @@ func createGateway(ctx context.Context, g Gateway, port string, customHeaders ..
mux := createMux(g, customHeaders...)
opts := createDialOptions(g)
err := g.RegisterGateway()(ctx, mux, http_util.Endpoint(port), opts)
logging.Log("SERVE-7B7G0E").OnError(err).Panic("failed to register grpc gateway")
logging.Log("SERVE-7B7G0E").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Panic("failed to register grpc gateway")
return addInterceptors(mux, g)
}
@@ -116,8 +117,10 @@ func createMux(g Gateway, customHeaders ...string) *runtime.ServeMux {
}
func createDialOptions(g Gateway) []grpc.DialOption {
opts := []grpc.DialOption{grpc.WithInsecure()}
opts = append(opts, client_middleware.DefaultTracingStatsClient())
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(client_middleware.DefaultTracingClient()),
}
if customOpts, ok := g.(gatewayCustomCallOptions); ok {
opts = append(opts, customOpts.GatewayCallOptions()...)

View File

@@ -20,15 +20,14 @@ func AuthorizationInterceptor(verifier *authz.TokenVerifier, authConfig authz.Co
}
func authorize(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, verifier *authz.TokenVerifier, authConfig authz.Config) (_ interface{}, err error) {
ctx, span := tracing.NewServerInterceptorSpan(ctx)
defer func() { span.EndWithError(err) }()
authOpt, needsToken := verifier.CheckAuthMethod(info.FullMethod)
if !needsToken {
span.End()
return handler(ctx, req)
}
ctx, span := tracing.NewServerInterceptorSpan(ctx)
defer func() { span.EndWithError(err) }()
authToken := grpc_util.GetAuthorizationHeader(ctx)
if authToken == "" {
return nil, status.Error(codes.Unauthenticated, "auth header missing")

View File

@@ -4,45 +4,30 @@ import (
"context"
"strings"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/stats"
grpc_utils "github.com/caos/zitadel/internal/api/grpc"
"github.com/caos/zitadel/internal/tracing"
grpc_trace "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)
type GRPCMethod string
func TracingStatsServer(ignoredMethods ...GRPCMethod) grpc.ServerOption {
return grpc.StatsHandler(
&tracingServerHandler{
ignoredMethods,
ocgrpc.ServerHandler{
StartOptions: trace.StartOptions{
Sampler: tracing.Sampler(),
SpanKind: trace.SpanKindServer,
},
},
},
)
func DefaultTracingServer() grpc.UnaryServerInterceptor {
return TracingServer(grpc_utils.Healthz, grpc_utils.Readiness, grpc_utils.Validation)
}
func DefaultTracingStatsServer() grpc.ServerOption {
return TracingStatsServer(grpc_utils.Healthz, grpc_utils.Readiness, grpc_utils.Validation)
}
func TracingServer(ignoredMethods ...GRPCMethod) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
type tracingServerHandler struct {
IgnoredMethods []GRPCMethod
ocgrpc.ServerHandler
}
func (s *tracingServerHandler) TagRPC(ctx context.Context, tagInfo *stats.RPCTagInfo) context.Context {
for _, method := range s.IgnoredMethods {
if strings.HasSuffix(tagInfo.FullMethodName, string(method)) {
return ctx
for _, ignoredMethod := range ignoredMethods {
if strings.HasSuffix(info.FullMethod, string(ignoredMethod)) {
return handler(ctx, req)
}
}
return grpc_trace.UnaryServerInterceptor()(ctx, req, info, handler)
}
return s.ServerHandler.TagRPC(ctx, tagInfo)
}

View File

@@ -1,71 +0,0 @@
package middleware
import (
"context"
"testing"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc/stats"
)
func Test_tracingServerHandler_TagRPC(t *testing.T) {
type fields struct {
IgnoredMethods []GRPCMethod
ServerHandler ocgrpc.ServerHandler
}
type args struct {
ctx context.Context
tagInfo *stats.RPCTagInfo
}
type res struct {
wantSpan bool
}
tests := []struct {
name string
fields fields
args args
res res
}{
{
"ignored method",
fields{
IgnoredMethods: []GRPCMethod{"ignore"},
ServerHandler: ocgrpc.ServerHandler{},
},
args{
ctx: context.Background(),
tagInfo: &stats.RPCTagInfo{
FullMethodName: "ignore",
},
},
res{false},
},
{
"tag",
fields{
IgnoredMethods: []GRPCMethod{"ignore"},
ServerHandler: ocgrpc.ServerHandler{},
},
args{
ctx: context.Background(),
tagInfo: &stats.RPCTagInfo{
FullMethodName: "tag",
},
},
res{true},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &tracingServerHandler{
IgnoredMethods: tt.fields.IgnoredMethods,
ServerHandler: tt.fields.ServerHandler,
}
got := s.TagRPC(tt.args.ctx, tt.args.tagInfo)
if (trace.FromContext(got) != nil) != tt.res.wantSpan {
t.Errorf("TagRPC() = %v, want %v", got, tt.res.wantSpan)
}
})
}
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/proto"
"github.com/caos/zitadel/internal/tracing"
)
type ValidationFunction func(ctx context.Context) error
@@ -41,7 +42,7 @@ func validate(ctx context.Context, validations map[string]ValidationFunction) ma
errors := make(map[string]error)
for id, validation := range validations {
if err := validation(ctx); err != nil {
logging.Log("API-vf823").WithError(err).Error("validation failed")
logging.Log("API-vf823").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Error("validation failed")
errors[id] = err
}
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/api/grpc/server/middleware"
"github.com/caos/zitadel/internal/api/http"
"github.com/caos/zitadel/internal/tracing"
)
const (
@@ -27,9 +28,9 @@ type Server interface {
func CreateServer(verifier *authz.TokenVerifier, authConfig authz.Config, lang language.Tag) *grpc.Server {
return grpc.NewServer(
middleware.DefaultTracingStatsServer(),
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
middleware.DefaultTracingServer(),
middleware.ErrorHandler(),
middleware.AuthorizationInterceptor(verifier, authConfig),
middleware.TranslationHandler(lang),
@@ -49,9 +50,9 @@ func Serve(ctx context.Context, server *grpc.Server, port string) {
go func() {
listener := http.CreateListener(port)
err := server.Serve(listener)
logging.Log("SERVE-Ga3e94").OnError(err).Panic("grpc server serve failed")
logging.Log("SERVE-Ga3e94").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Panic("grpc server serve failed")
}()
logging.LogWithFields("SERVE-bZ44QM", "port", port).Info("grpc server is listening")
logging.LogWithFields("SERVE-bZ44QM", "port", port).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Info("grpc server is listening")
}
func grpcPort(port string) string {

View File

@@ -8,7 +8,7 @@ import (
)
func DefaultTraceHandler(handler http.Handler) http.Handler {
return tracing.TraceHandler(handler, http_utils.Probes...)
return TraceHandler(http_utils.Probes...)(handler)
}
func TraceHandler(ignoredMethods ...string) func(http.Handler) http.Handler {

View File

@@ -5,6 +5,7 @@ import (
"net/http"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/tracing"
)
func Serve(ctx context.Context, handler http.Handler, port, servername string) {
@@ -17,14 +18,14 @@ func Serve(ctx context.Context, handler http.Handler, port, servername string) {
go func() {
<-ctx.Done()
err := server.Shutdown(ctx)
logging.LogWithFields("HTTP-m7kBlq", "name", servername).OnError(err).Warnf("error during graceful shutdown of http server (%s)", servername)
logging.LogWithFields("HTTP-m7kBlq", "name", servername).WithField("traceID", tracing.TraceIDFromCtx(ctx)).OnError(err).Warnf("error during graceful shutdown of http server (%s)", servername)
}()
go func() {
err := server.Serve(listener)
logging.LogWithFields("HTTP-tBHR60", "name", servername).OnError(err).Panicf("http serve (%s) failed", servername)
logging.LogWithFields("HTTP-tBHR60", "name", servername).OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Panicf("http serve (%s) failed", servername)
}()
logging.LogWithFields("HTTP-KHh0Cb", "name", servername, "port", port).Infof("http server (%s) is listening", servername)
logging.LogWithFields("HTTP-KHh0Cb", "name", servername, "port", port).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Infof("http server (%s) is listening", servername)
}
func RegisterHandler(mux *http.ServeMux, prefix string, handler http.Handler) {

View File

@@ -12,6 +12,7 @@ import (
"github.com/caos/zitadel/internal/auth/repository"
"github.com/caos/zitadel/internal/config/types"
"github.com/caos/zitadel/internal/id"
"github.com/caos/zitadel/internal/tracing"
)
type OPHandlerConfig struct {
@@ -52,7 +53,7 @@ type OPStorage struct {
func NewProvider(ctx context.Context, config OPHandlerConfig, repo repository.Repository, localDevMode bool) op.OpenIDProvider {
cookieHandler, err := middleware.NewUserAgentHandler(config.UserAgentCookieConfig, id.SonyFlakeGenerator, localDevMode)
logging.Log("OIDC-sd4fd").OnError(err).Panic("cannot user agent handler")
logging.Log("OIDC-sd4fd").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Panic("cannot user agent handler")
config.OPConfig.CodeMethodS256 = true
provider, err := op.NewOpenIDProvider(
ctx,
@@ -71,7 +72,7 @@ func NewProvider(ctx context.Context, config OPHandlerConfig, repo repository.Re
op.WithCustomKeysEndpoint(op.NewEndpointWithURL(config.Endpoints.Keys.Path, config.Endpoints.Keys.URL)),
op.WithRetry(3, time.Duration(30*time.Second)),
)
logging.Log("OIDC-asf13").OnError(err).Panic("cannot create provider")
logging.Log("OIDC-asf13").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Panic("cannot create provider")
return provider
}

View File

@@ -112,7 +112,7 @@ func (repo *AuthRequestRepo) CreateAuthRequest(ctx context.Context, request *mod
request.AppendAudIfNotExisting(app.ProjectID)
if request.LoginHint != "" {
err = repo.checkLoginName(ctx, request, request.LoginHint)
logging.LogWithFields("EVENT-aG311", "login name", request.LoginHint, "id", request.ID, "applicationID", request.ApplicationID).OnError(err).Debug("login hint invalid")
logging.LogWithFields("EVENT-aG311", "login name", request.LoginHint, "id", request.ID, "applicationID", request.ApplicationID, "traceID", tracing.TraceIDFromCtx(ctx)).OnError(err).Debug("login hint invalid")
}
err = repo.AuthRequests.SaveAuthRequest(ctx, request)
if err != nil {
@@ -723,7 +723,7 @@ func userSessionByIDs(ctx context.Context, provider userSessionViewProvider, eve
}
events, err := eventProvider.UserEventsByID(ctx, user.ID, session.Sequence)
if err != nil {
logging.Log("EVENT-Hse6s").WithError(err).Debug("error retrieving new events")
logging.Log("EVENT-Hse6s").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("error retrieving new events")
return user_view_model.UserSessionToModel(session), nil
}
sessionCopy := *session
@@ -744,7 +744,7 @@ func userSessionByIDs(ctx context.Context, provider userSessionViewProvider, eve
es_model.HumanSignedOut:
eventData, err := user_view_model.UserSessionFromEvent(event)
if err != nil {
logging.Log("EVENT-sdgT3").WithError(err).Debug("error getting event data")
logging.Log("EVENT-sdgT3").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("error getting event data")
return user_view_model.UserSessionToModel(session), nil
}
if eventData.UserAgentID != agentID {
@@ -793,7 +793,7 @@ func userByID(ctx context.Context, viewProvider userViewProvider, eventProvider
}
events, err := eventProvider.UserEventsByID(ctx, userID, user.Sequence)
if err != nil {
logging.Log("EVENT-dfg42").WithError(err).Debug("error retrieving new events")
logging.Log("EVENT-dfg42").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("error retrieving new events")
return user_view_model.UserToModel(user), nil
}
if len(events) == 0 {

View File

@@ -3,11 +3,12 @@ package eventstore
import (
"context"
"encoding/json"
iam_model "github.com/caos/zitadel/internal/iam/model"
iam_view_model "github.com/caos/zitadel/internal/iam/repository/view/model"
"testing"
"time"
iam_model "github.com/caos/zitadel/internal/iam/model"
iam_view_model "github.com/caos/zitadel/internal/iam/repository/view/model"
"github.com/stretchr/testify/assert"
"github.com/caos/zitadel/internal/auth/repository/eventsourcing/view"
@@ -1296,7 +1297,6 @@ func Test_userSessionByIDs(t *testing.T) {
func Test_userByID(t *testing.T) {
type args struct {
ctx context.Context
viewProvider userViewProvider
eventProvider userEventProvider
userID string
@@ -1394,7 +1394,7 @@ func Test_userByID(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := userByID(tt.args.ctx, tt.args.viewProvider, tt.args.eventProvider, tt.args.userID)
got, err := userByID(context.Background(), tt.args.viewProvider, tt.args.eventProvider, tt.args.userID)
if (err != nil && tt.wantErr == nil) || (tt.wantErr != nil && !tt.wantErr(err)) {
t.Errorf("nextSteps() wrong error = %v", err)
return

View File

@@ -2,12 +2,14 @@ package eventstore
import (
"context"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/errors"
iam_model "github.com/caos/zitadel/internal/iam/model"
iam_view_model "github.com/caos/zitadel/internal/iam/repository/view/model"
"github.com/caos/zitadel/internal/tracing"
auth_model "github.com/caos/zitadel/internal/auth/model"
auth_view "github.com/caos/zitadel/internal/auth/repository/eventsourcing/view"
@@ -35,7 +37,7 @@ type OrgRepository struct {
func (repo *OrgRepository) SearchOrgs(ctx context.Context, request *org_model.OrgSearchRequest) (*org_model.OrgSearchResult, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestOrgSequence()
logging.Log("EVENT-7Udhz").OnError(err).Warn("could not read latest org sequence")
logging.Log("EVENT-7Udhz").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest org sequence")
members, count, err := repo.View.SearchOrgs(request)
if err != nil {
return nil, err

View File

@@ -2,17 +2,18 @@ package eventstore
import (
"context"
"github.com/caos/logging"
auth_req_model "github.com/caos/zitadel/internal/auth_request/model"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/models"
usr_model "github.com/caos/zitadel/internal/user/model"
user_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
"github.com/caos/zitadel/internal/user/repository/view/model"
"strings"
"time"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/auth/repository/eventsourcing/view"
auth_req_model "github.com/caos/zitadel/internal/auth_request/model"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/tracing"
usr_model "github.com/caos/zitadel/internal/user/model"
user_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
"github.com/caos/zitadel/internal/user/repository/view/model"
)
type TokenRepo struct {
@@ -76,7 +77,7 @@ func (repo *TokenRepo) TokenByID(ctx context.Context, userID, tokenID string) (*
}
if esErr != nil {
logging.Log("EVENT-5Nm9s").WithError(viewErr).Debug("error retrieving new events")
logging.Log("EVENT-5Nm9s").WithError(viewErr).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("error retrieving new events")
return model.TokenViewToModel(token), nil
}
viewToken := *token

View File

@@ -110,7 +110,7 @@ func (repo *UserRepo) ChangeMyProfile(ctx context.Context, profile *model.Profil
func (repo *UserRepo) SearchMyExternalIDPs(ctx context.Context, request *model.ExternalIDPSearchRequest) (*model.ExternalIDPSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, seqErr := repo.View.GetLatestExternalIDPSequence()
logging.Log("EVENT-5Jsi8").OnError(seqErr).Warn("could not read latest user sequence")
logging.Log("EVENT-5Jsi8").OnError(seqErr).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest user sequence")
request.AppendUserQuery(authz.GetCtxData(ctx).UserID)
externalIDPS, count, err := repo.View.SearchExternalIDPs(request)
if err != nil {
@@ -268,7 +268,7 @@ func (repo *UserRepo) AddMfaOTP(ctx context.Context, userID string) (*model.OTP,
accountName := ""
user, err := repo.UserByID(ctx, userID)
if err != nil {
logging.Log("EVENT-Fk93s").WithError(err).Debug("unable to get user for loginname")
logging.Log("EVENT-Fk93s").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("unable to get user for loginname")
} else {
accountName = user.PreferredLoginName
}
@@ -279,7 +279,7 @@ func (repo *UserRepo) AddMyMfaOTP(ctx context.Context) (*model.OTP, error) {
accountName := ""
user, err := repo.UserByID(ctx, authz.GetCtxData(ctx).UserID)
if err != nil {
logging.Log("EVENT-Ml0sd").WithError(err).Debug("unable to get user for loginname")
logging.Log("EVENT-Ml0sd").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("unable to get user for loginname")
} else {
accountName = user.PreferredLoginName
}
@@ -370,7 +370,7 @@ func (repo *UserRepo) UserByID(ctx context.Context, id string) (*model.UserView,
}
events, err := repo.UserEvents.UserEventsByID(ctx, id, user.Sequence)
if err != nil {
logging.Log("EVENT-PSoc3").WithError(err).Debug("error retrieving new events")
logging.Log("EVENT-PSoc3").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("error retrieving new events")
return usr_view_model.UserToModel(user), nil
}
userCopy := *user

View File

@@ -12,6 +12,7 @@ import (
global_model "github.com/caos/zitadel/internal/model"
org_model "github.com/caos/zitadel/internal/org/model"
org_view_model "github.com/caos/zitadel/internal/org/repository/view/model"
"github.com/caos/zitadel/internal/tracing"
user_model "github.com/caos/zitadel/internal/user/model"
user_view_model "github.com/caos/zitadel/internal/user/repository/view/model"
grant_model "github.com/caos/zitadel/internal/usergrant/model"
@@ -29,7 +30,7 @@ type UserGrantRepo struct {
func (repo *UserGrantRepo) SearchMyUserGrants(ctx context.Context, request *grant_model.UserGrantSearchRequest) (*grant_model.UserGrantSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestUserGrantSequence()
logging.Log("EVENT-Hd7s3").OnError(err).Warn("could not read latest user grant sequence")
logging.Log("EVENT-Hd7s3").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest user grant sequence")
request.Queries = append(request.Queries, &grant_model.UserGrantSearchQuery{Key: grant_model.UserGrantSearchKeyUserID, Method: global_model.SearchMethodEquals, Value: authz.GetCtxData(ctx).UserID})
grants, count, err := repo.View.SearchUserGrants(request)
if err != nil {

View File

@@ -2,12 +2,13 @@ package eventstore
import (
"context"
"strings"
"time"
"github.com/caos/logging"
usr_model "github.com/caos/zitadel/internal/user/model"
usr_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
"github.com/caos/zitadel/internal/user/repository/view/model"
"strings"
"time"
"github.com/caos/zitadel/internal/authz/repository/eventsourcing/view"
"github.com/caos/zitadel/internal/crypto"
@@ -43,7 +44,7 @@ func (repo *TokenVerifierRepo) TokenByID(ctx context.Context, tokenID, userID st
}
if esErr != nil {
logging.Log("EVENT-5Nm9s").WithError(viewErr).Debug("error retrieving new events")
logging.Log("EVENT-5Nm9s").WithError(viewErr).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("error retrieving new events")
return model.TokenViewToModel(token), nil
}
viewToken := *token

View File

@@ -8,6 +8,7 @@ import (
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/models"
es_models "github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/tracing"
)
type Querier interface {
@@ -55,7 +56,7 @@ func (db *SQL) LatestSequence(ctx context.Context, queryFactory *es_models.Searc
sequence := new(Sequence)
err := rowScanner(row.Scan, sequence)
if err != nil {
logging.Log("SQL-WsxTg").WithError(err).Info("query failed")
logging.Log("SQL-WsxTg").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Info("query failed")
return 0, errors.ThrowInternal(err, "SQL-Yczyx", "unable to filter latest sequence")
}
return uint64(*sequence), nil

View File

@@ -8,6 +8,7 @@ import (
"github.com/caos/logging"
caos_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/tracing"
"github.com/cockroachdb/cockroach-go/v2/crdb"
)
@@ -25,7 +26,7 @@ func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggrega
stmt, err := tx.Prepare(insertStmt)
if err != nil {
tx.Rollback()
logging.Log("SQL-9ctx5").WithError(err).Warn("prepare failed")
logging.Log("SQL-9ctx5").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("prepare failed")
return caos_errs.ThrowInternal(err, "SQL-juCgA", "prepare failed")
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/eventstore/query"
"github.com/caos/zitadel/internal/tracing"
"github.com/caos/zitadel/internal/view/repository"
"time"
@@ -74,7 +75,7 @@ func (s *spooledHandler) load(workerID string) {
errs <- err
} else {
errs <- s.process(ctx, events, workerID)
logging.Log("SPOOL-0pV8o").WithField("view", s.ViewModel()).WithField("worker", workerID).Debug("process done")
logging.Log("SPOOL-0pV8o").WithField("view", s.ViewModel()).WithField("worker", workerID).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("process done")
}
}
<-ctx.Done()
@@ -92,7 +93,7 @@ func (s *spooledHandler) process(ctx context.Context, events []*models.Event, wo
for _, event := range events {
select {
case <-ctx.Done():
logging.LogWithFields("SPOOL-FTKwH", "view", s.ViewModel(), "worker", workerID).Debug("context canceled")
logging.LogWithFields("SPOOL-FTKwH", "view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).Debug("context canceled")
return nil
default:
if err := s.Reduce(event); err != nil {
@@ -110,7 +111,7 @@ func (s *spooledHandler) query(ctx context.Context) ([]*models.Event, error) {
}
factory := models.FactoryFromSearchQuery(query)
sequence, err := s.eventstore.LatestSequence(ctx, factory)
logging.Log("SPOOL-7SciK").OnError(err).Debug("unable to query latest sequence")
logging.Log("SPOOL-7SciK").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("unable to query latest sequence")
var processedSequence uint64
for _, filter := range query.Filters {
if filter.GetField() == models.Field_LatestSequence {

View File

@@ -22,6 +22,7 @@ import (
org_es "github.com/caos/zitadel/internal/org/repository/eventsourcing"
org_es_model "github.com/caos/zitadel/internal/org/repository/eventsourcing/model"
"github.com/caos/zitadel/internal/org/repository/view/model"
"github.com/caos/zitadel/internal/tracing"
usr_model "github.com/caos/zitadel/internal/user/model"
usr_es "github.com/caos/zitadel/internal/user/repository/eventsourcing"
)
@@ -108,7 +109,7 @@ func (repo *OrgRepository) SearchMyOrgDomains(ctx context.Context, request *org_
request.EnsureLimit(repo.SearchLimit)
request.Queries = append(request.Queries, &org_model.OrgDomainSearchQuery{Key: org_model.OrgDomainSearchKeyOrgID, Method: global_model.SearchMethodEquals, Value: authz.GetCtxData(ctx).OrgID})
sequence, sequenceErr := repo.View.GetLatestOrgDomainSequence()
logging.Log("EVENT-SLowp").OnError(sequenceErr).Warn("could not read latest org domain sequence")
logging.Log("EVENT-SLowp").OnError(sequenceErr).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest org domain sequence")
domains, count, err := repo.View.SearchOrgDomains(request)
if err != nil {
return nil, err

View File

@@ -5,8 +5,9 @@ import (
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/tracing"
tracing_g "github.com/caos/zitadel/internal/tracing/google"
tracing_log "github.com/caos/zitadel/internal/tracing/log"
"github.com/caos/zitadel/internal/tracing/google"
"github.com/caos/zitadel/internal/tracing/log"
"github.com/caos/zitadel/internal/tracing/otel"
)
type TracingConfig struct {
@@ -15,8 +16,9 @@ type TracingConfig struct {
}
var tracer = map[string]func() tracing.Config{
"google": func() tracing.Config { return &tracing_g.Config{} },
"log": func() tracing.Config { return &tracing_log.Config{} },
"otel": func() tracing.Config { return &otel.Config{} },
"google": func() tracing.Config { return &google.Config{} },
"log": func() tracing.Config { return &log.Config{} },
"none": func() tracing.Config { return &NoTracing{} },
"": func() tracing.Config { return &NoTracing{} },
}

View File

@@ -1,3 +0,0 @@
package tracing
//go:generate mockgen -package mock -destination mock/tracing_mock.go github.com/caos/zitadel/internal/tracing Tracer

View File

@@ -1,24 +0,0 @@
package google
import (
"go.opencensus.io/trace"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/tracing"
)
type Config struct {
ProjectID string
MetricPrefix string
Fraction float64
}
func (c *Config) NewTracer() error {
if !envIsSet() {
return errors.ThrowInvalidArgument(nil, "GOOGL-sdh3a", "env not properly set, GOOGLE_APPLICATION_CREDENTIALS is misconfigured or missing")
}
tracing.T = &Tracer{projectID: c.ProjectID, metricPrefix: c.MetricPrefix, sampler: trace.ProbabilitySampler(c.Fraction)}
return tracing.T.Start()
}

View File

@@ -0,0 +1,43 @@
package google
import (
"os"
"strings"
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/tracing"
"github.com/caos/zitadel/internal/tracing/otel"
sdk_trace "go.opentelemetry.io/otel/sdk/trace"
)
type Config struct {
ProjectID string
MetricPrefix string
Fraction float64
}
type Tracer struct {
otel.Tracer
}
func (c *Config) NewTracer() error {
if !envIsSet() {
return errors.ThrowInvalidArgument(nil, "GOOGL-sdh3a", "env not properly set, GOOGLE_APPLICATION_CREDENTIALS is misconfigured or missing")
}
sampler := sdk_trace.ParentBased(sdk_trace.TraceIDRatioBased(c.Fraction))
exporter, err := texporter.NewExporter(texporter.WithProjectID(c.ProjectID))
if err != nil {
return err
}
tracing.T = &Tracer{Tracer: *(otel.NewTracer(c.MetricPrefix, sampler, exporter))}
return nil
}
func envIsSet() bool {
gAuthCred := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
return strings.Contains(gAuthCred, ".json")
}

View File

@@ -1,95 +0,0 @@
package google
import (
"context"
"net/http"
"os"
"strings"
"contrib.go.opencensus.io/exporter/stackdriver"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats/view"
"go.opencensus.io/trace"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/tracing"
)
type Tracer struct {
Exporter *stackdriver.Exporter
projectID string
metricPrefix string
sampler trace.Sampler
}
func (t *Tracer) Start() (err error) {
t.Exporter, err = stackdriver.NewExporter(stackdriver.Options{
ProjectID: t.projectID,
MetricPrefix: t.metricPrefix,
})
if err != nil {
return errors.ThrowInternal(err, "GOOGL-4dCnX", "unable to start exporter")
}
views := append(ocgrpc.DefaultServerViews, ocgrpc.DefaultClientViews...)
views = append(views, ochttp.DefaultClientViews...)
views = append(views, ochttp.DefaultServerViews...)
if err = view.Register(views...); err != nil {
return errors.ThrowInternal(err, "GOOGL-Q6L6w", "unable to register view")
}
trace.RegisterExporter(t.Exporter)
trace.ApplyConfig(trace.Config{DefaultSampler: t.sampler})
return nil
}
func (t *Tracer) Sampler() trace.Sampler {
return t.sampler
}
func (t *Tracer) NewServerInterceptorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {
return t.newSpanFromName(ctx, name, trace.WithSpanKind(trace.SpanKindServer))
}
func (t *Tracer) NewServerSpan(ctx context.Context, caller string) (context.Context, *tracing.Span) {
return t.newSpan(ctx, caller, trace.WithSpanKind(trace.SpanKindServer))
}
func (t *Tracer) NewClientInterceptorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {
return t.newSpanFromName(ctx, name, trace.WithSpanKind(trace.SpanKindClient))
}
func (t *Tracer) NewClientSpan(ctx context.Context, caller string) (context.Context, *tracing.Span) {
return t.newSpan(ctx, caller, trace.WithSpanKind(trace.SpanKindClient))
}
func (t *Tracer) NewSpan(ctx context.Context, caller string) (context.Context, *tracing.Span) {
return t.newSpan(ctx, caller)
}
func (t *Tracer) newSpan(ctx context.Context, caller string, options ...trace.StartOption) (context.Context, *tracing.Span) {
return t.newSpanFromName(ctx, caller, options...)
}
func (t *Tracer) newSpanFromName(ctx context.Context, name string, options ...trace.StartOption) (context.Context, *tracing.Span) {
ctx, span := trace.StartSpan(ctx, name, options...)
return ctx, tracing.CreateSpan(span)
}
func (t *Tracer) NewSpanHTTP(r *http.Request, caller string) (*http.Request, *tracing.Span) {
ctx, span := t.NewSpan(r.Context(), caller)
r = r.WithContext(ctx)
return r, span
}
func envIsSet() bool {
gAuthCred := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
return strings.Contains(gAuthCred, ".json")
}
func (t *Tracer) SetErrStatus(span *trace.Span, code int32, err error, obj ...string) {
span.SetStatus(trace.Status{Code: code, Message: err.Error() + strings.Join(obj, ", ")})
}

View File

@@ -4,29 +4,28 @@ import (
"net/http"
"strings"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/trace"
http_trace "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
func TraceHandler(handler http.Handler, ignoredEndpoints ...string) http.Handler {
return &ochttp.Handler{
Handler: handler,
FormatSpanName: func(r *http.Request) string {
host := r.URL.Host
if host == "" {
host = r.Host
func shouldNotIgnore(endpoints ...string) func(r *http.Request) bool {
return func(r *http.Request) bool {
for _, endpoint := range endpoints {
if strings.HasPrefix(r.URL.RequestURI(), endpoint) {
return false
}
return host + r.URL.Path
},
StartOptions: trace.StartOptions{Sampler: Sampler()},
IsHealthEndpoint: func(r *http.Request) bool {
for _, endpoint := range ignoredEndpoints {
if strings.HasPrefix(r.URL.RequestURI(), endpoint) {
return true
}
}
return false
},
}
return true
}
}
func TraceHandler(handler http.Handler, ignoredEndpoints ...string) http.Handler {
return http_trace.NewHandler(handler,
"zitadel",
http_trace.WithFilter(shouldNotIgnore(ignoredEndpoints...)),
http_trace.WithPublicEndpoint(),
http_trace.WithSpanNameFormatter(spanNameFormatter))
}
func spanNameFormatter(_ string, r *http.Request) string {
return r.Host + r.URL.EscapedPath()
}

View File

@@ -1,21 +1,28 @@
package log
import (
"go.opencensus.io/trace"
"github.com/caos/zitadel/internal/tracing"
"github.com/caos/zitadel/internal/tracing/otel"
"go.opentelemetry.io/otel/exporters/stdout"
sdk_trace "go.opentelemetry.io/otel/sdk/trace"
)
type Config struct {
Fraction float64
Fraction float64
MetricPrefix string
}
type Tracer struct {
otel.Tracer
}
func (c *Config) NewTracer() error {
if c.Fraction < 1 {
c.Fraction = 1
sampler := sdk_trace.ParentBased(sdk_trace.TraceIDRatioBased(c.Fraction))
exporter, err := stdout.NewExporter(stdout.WithPrettyPrint())
if err != nil {
return err
}
tracing.T = &Tracer{trace.ProbabilitySampler(c.Fraction)}
return tracing.T.Start()
tracing.T = &Tracer{Tracer: *(otel.NewTracer(c.MetricPrefix, sampler, exporter))}
return nil
}

View File

@@ -1,74 +0,0 @@
package log
import (
"context"
"net/http"
"go.opencensus.io/examples/exporter"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats/view"
"go.opencensus.io/trace"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/tracing"
)
type Tracer struct {
sampler trace.Sampler
}
func (t *Tracer) Start() error {
trace.RegisterExporter(&exporter.PrintExporter{})
views := append(ocgrpc.DefaultServerViews, ocgrpc.DefaultClientViews...)
views = append(views, ochttp.DefaultClientViews...)
views = append(views, ochttp.DefaultServerViews...)
if err := view.Register(views...); err != nil {
return errors.ThrowInternal(err, "LOG-PoFiB", "unable to register view")
}
trace.ApplyConfig(trace.Config{DefaultSampler: t.sampler})
return nil
}
func (t *Tracer) Sampler() trace.Sampler {
return t.sampler
}
func (t *Tracer) NewServerInterceptorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {
return t.newSpanFromName(ctx, name, trace.WithSpanKind(trace.SpanKindServer))
}
func (t *Tracer) NewServerSpan(ctx context.Context, caller string) (context.Context, *tracing.Span) {
return t.newSpan(ctx, caller, trace.WithSpanKind(trace.SpanKindServer))
}
func (t *Tracer) NewClientInterceptorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {
return t.newSpanFromName(ctx, name, trace.WithSpanKind(trace.SpanKindClient))
}
func (t *Tracer) NewClientSpan(ctx context.Context, caller string) (context.Context, *tracing.Span) {
return t.newSpan(ctx, caller, trace.WithSpanKind(trace.SpanKindClient))
}
func (t *Tracer) NewSpan(ctx context.Context, caller string) (context.Context, *tracing.Span) {
return t.newSpan(ctx, caller)
}
func (t *Tracer) newSpan(ctx context.Context, caller string, options ...trace.StartOption) (context.Context, *tracing.Span) {
return t.newSpanFromName(ctx, caller, options...)
}
func (t *Tracer) newSpanFromName(ctx context.Context, name string, options ...trace.StartOption) (context.Context, *tracing.Span) {
ctx, span := trace.StartSpan(ctx, name, options...)
return ctx, tracing.CreateSpan(span)
}
func (t *Tracer) NewSpanHTTP(r *http.Request, caller string) (*http.Request, *tracing.Span) {
ctx, span := t.NewSpan(r.Context(), caller)
r = r.WithContext(ctx)
return r, span
}

View File

@@ -1,155 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/caos/zitadel/internal/tracing (interfaces: Tracer)
// Package mock is a generated GoMock package.
package mock
import (
context "context"
tracing "github.com/caos/zitadel/internal/tracing"
gomock "github.com/golang/mock/gomock"
trace "go.opencensus.io/trace"
http "net/http"
reflect "reflect"
)
// MockTracer is a mock of Tracer interface
type MockTracer struct {
ctrl *gomock.Controller
recorder *MockTracerMockRecorder
}
// MockTracerMockRecorder is the mock recorder for MockTracer
type MockTracerMockRecorder struct {
mock *MockTracer
}
// NewMockTracer creates a new mock instance
func NewMockTracer(ctrl *gomock.Controller) *MockTracer {
mock := &MockTracer{ctrl: ctrl}
mock.recorder = &MockTracerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockTracer) EXPECT() *MockTracerMockRecorder {
return m.recorder
}
// NewClientInterceptorSpan mocks base method
func (m *MockTracer) NewClientInterceptorSpan(arg0 context.Context, arg1 string) (context.Context, *tracing.Span) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NewClientInterceptorSpan", arg0, arg1)
ret0, _ := ret[0].(context.Context)
ret1, _ := ret[1].(*tracing.Span)
return ret0, ret1
}
// NewClientInterceptorSpan indicates an expected call of NewClientInterceptorSpan
func (mr *MockTracerMockRecorder) NewClientInterceptorSpan(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewClientInterceptorSpan", reflect.TypeOf((*MockTracer)(nil).NewClientInterceptorSpan), arg0, arg1)
}
// NewClientSpan mocks base method
func (m *MockTracer) NewClientSpan(arg0 context.Context, arg1 string) (context.Context, *tracing.Span) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NewClientSpan", arg0, arg1)
ret0, _ := ret[0].(context.Context)
ret1, _ := ret[1].(*tracing.Span)
return ret0, ret1
}
// NewClientSpan indicates an expected call of NewClientSpan
func (mr *MockTracerMockRecorder) NewClientSpan(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewClientSpan", reflect.TypeOf((*MockTracer)(nil).NewClientSpan), arg0, arg1)
}
// NewServerInterceptorSpan mocks base method
func (m *MockTracer) NewServerInterceptorSpan(arg0 context.Context, arg1 string) (context.Context, *tracing.Span) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NewServerInterceptorSpan", arg0, arg1)
ret0, _ := ret[0].(context.Context)
ret1, _ := ret[1].(*tracing.Span)
return ret0, ret1
}
// NewServerInterceptorSpan indicates an expected call of NewServerInterceptorSpan
func (mr *MockTracerMockRecorder) NewServerInterceptorSpan(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewServerInterceptorSpan", reflect.TypeOf((*MockTracer)(nil).NewServerInterceptorSpan), arg0, arg1)
}
// NewServerSpan mocks base method
func (m *MockTracer) NewServerSpan(arg0 context.Context, arg1 string) (context.Context, *tracing.Span) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NewServerSpan", arg0, arg1)
ret0, _ := ret[0].(context.Context)
ret1, _ := ret[1].(*tracing.Span)
return ret0, ret1
}
// NewServerSpan indicates an expected call of NewServerSpan
func (mr *MockTracerMockRecorder) NewServerSpan(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewServerSpan", reflect.TypeOf((*MockTracer)(nil).NewServerSpan), arg0, arg1)
}
// NewSpan mocks base method
func (m *MockTracer) NewSpan(arg0 context.Context, arg1 string) (context.Context, *tracing.Span) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NewSpan", arg0, arg1)
ret0, _ := ret[0].(context.Context)
ret1, _ := ret[1].(*tracing.Span)
return ret0, ret1
}
// NewSpan indicates an expected call of NewSpan
func (mr *MockTracerMockRecorder) NewSpan(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewSpan", reflect.TypeOf((*MockTracer)(nil).NewSpan), arg0, arg1)
}
// NewSpanHTTP mocks base method
func (m *MockTracer) NewSpanHTTP(arg0 *http.Request, arg1 string) (*http.Request, *tracing.Span) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NewSpanHTTP", arg0, arg1)
ret0, _ := ret[0].(*http.Request)
ret1, _ := ret[1].(*tracing.Span)
return ret0, ret1
}
// NewSpanHTTP indicates an expected call of NewSpanHTTP
func (mr *MockTracerMockRecorder) NewSpanHTTP(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewSpanHTTP", reflect.TypeOf((*MockTracer)(nil).NewSpanHTTP), arg0, arg1)
}
// Sampler mocks base method
func (m *MockTracer) Sampler() trace.Sampler {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Sampler")
ret0, _ := ret[0].(trace.Sampler)
return ret0
}
// Sampler indicates an expected call of Sampler
func (mr *MockTracerMockRecorder) Sampler() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Sampler", reflect.TypeOf((*MockTracer)(nil).Sampler))
}
// Start mocks base method
func (m *MockTracer) Start() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Start")
ret0, _ := ret[0].(error)
return ret0
}
// Start indicates an expected call of Start
func (mr *MockTracerMockRecorder) Start() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockTracer)(nil).Start))
}

View File

@@ -1,20 +0,0 @@
package mock
import (
"context"
"testing"
"github.com/golang/mock/gomock"
"github.com/caos/zitadel/internal/tracing"
)
func NewSimpleMockTracer(t *testing.T) *MockTracer {
return NewMockTracer(gomock.NewController(t))
}
func ExpectServerSpan(ctx context.Context, mock interface{}) {
m := mock.(*MockTracer)
any := gomock.Any()
m.EXPECT().NewServerSpan(any, any).AnyTimes().Return(ctx, &tracing.Span{})
}

View File

@@ -0,0 +1,24 @@
package otel
import (
"github.com/caos/zitadel/internal/tracing"
"go.opentelemetry.io/otel/exporters/otlp"
sdk_trace "go.opentelemetry.io/otel/sdk/trace"
)
type Config struct {
Fraction float64
MetricPrefix string
Endpoint string
}
func (c *Config) NewTracer() error {
sampler := sdk_trace.ParentBased(sdk_trace.TraceIDRatioBased(c.Fraction))
exporter, err := otlp.NewExporter(otlp.WithAddress(c.Endpoint), otlp.WithInsecure())
if err != nil {
return err
}
tracing.T = NewTracer(c.MetricPrefix, sampler, exporter)
return nil
}

View File

@@ -0,0 +1,70 @@
package otel
import (
"context"
"net/http"
"github.com/caos/zitadel/internal/tracing"
"go.opentelemetry.io/otel/api/global"
api_trace "go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/propagators"
"go.opentelemetry.io/otel/sdk/export/trace"
sdk_trace "go.opentelemetry.io/otel/sdk/trace"
)
type Tracer struct {
Exporter api_trace.Tracer
sampler sdk_trace.Sampler
}
func NewTracer(name string, sampler sdk_trace.Sampler, exporter trace.SpanExporter) *Tracer {
tp := sdk_trace.NewTracerProvider(
sdk_trace.WithConfig(sdk_trace.Config{DefaultSampler: sampler}),
sdk_trace.WithSyncer(exporter),
)
global.SetTracerProvider(tp)
tc := propagators.TraceContext{}
global.SetTextMapPropagator(tc)
return &Tracer{Exporter: tp.Tracer(name), sampler: sampler}
}
func (t *Tracer) Sampler() sdk_trace.Sampler {
return t.sampler
}
func (t *Tracer) NewServerInterceptorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {
return t.newSpanFromName(ctx, name, api_trace.WithSpanKind(api_trace.SpanKindServer))
}
func (t *Tracer) NewServerSpan(ctx context.Context, caller string) (context.Context, *tracing.Span) {
return t.newSpan(ctx, caller, api_trace.WithSpanKind(api_trace.SpanKindServer))
}
func (t *Tracer) NewClientInterceptorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {
return t.newSpanFromName(ctx, name, api_trace.WithSpanKind(api_trace.SpanKindClient))
}
func (t *Tracer) NewClientSpan(ctx context.Context, caller string) (context.Context, *tracing.Span) {
return t.newSpan(ctx, caller, api_trace.WithSpanKind(api_trace.SpanKindClient))
}
func (t *Tracer) NewSpan(ctx context.Context, caller string) (context.Context, *tracing.Span) {
return t.newSpan(ctx, caller)
}
func (t *Tracer) newSpan(ctx context.Context, caller string, options ...api_trace.SpanOption) (context.Context, *tracing.Span) {
return t.newSpanFromName(ctx, caller, options...)
}
func (t *Tracer) newSpanFromName(ctx context.Context, name string, options ...api_trace.SpanOption) (context.Context, *tracing.Span) {
ctx, span := t.Exporter.Start(ctx, name, options...)
return ctx, tracing.CreateSpan(span)
}
func (t *Tracer) NewSpanHTTP(r *http.Request, caller string) (*http.Request, *tracing.Span) {
ctx, span := t.NewSpan(r.Context(), caller)
r = r.WithContext(ctx)
return r, span
}

View File

@@ -1,30 +1,29 @@
package tracing
import (
"fmt"
errors2 "github.com/caos/zitadel/internal/api/grpc/errors"
"strconv"
"context"
"go.opencensus.io/trace"
"github.com/caos/zitadel/internal/errors"
grpc_errs "github.com/caos/zitadel/internal/api/grpc/errors"
api_trace "go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/label"
)
type Span struct {
span *trace.Span
attributes []trace.Attribute
span api_trace.Span
opts []api_trace.SpanOption
}
func CreateSpan(span *trace.Span) *Span {
return &Span{span: span, attributes: []trace.Attribute{}}
func CreateSpan(span api_trace.Span) *Span {
return &Span{span: span, opts: []api_trace.SpanOption{}}
}
func (s *Span) End() {
if s.span == nil {
return
}
s.span.AddAttributes(s.attributes...)
s.span.End()
s.span.End(s.opts...)
}
func (s *Span) EndWithError(err error) {
@@ -36,53 +35,10 @@ func (s *Span) SetStatusByError(err error) {
if s.span == nil {
return
}
s.span.SetStatus(statusFromError(err))
}
func statusFromError(err error) trace.Status {
code, msg, _, _ := errors2.ExtractCaosError(err)
return trace.Status{Code: int32(code), Message: msg}
}
// AddAnnotation creates an annotation. The annotation will not be added to the tracing use Annotate(msg) afterwards
func (s *Span) AddAnnotation(key string, value interface{}) *Span {
attribute, err := toTraceAttribute(key, value)
if err != nil {
return s
s.span.RecordError(context.TODO(), err, api_trace.WithErrorStatus(codes.Error))
}
s.attributes = append(s.attributes, attribute)
return s
}
// Annotate creates an annotation in tracing. Before added annotations will be set
func (s *Span) Annotate(message string) *Span {
if s.span == nil {
return s
}
s.span.Annotate(s.attributes, message)
s.attributes = []trace.Attribute{}
return s
}
func (s *Span) Annotatef(format string, addiations ...interface{}) *Span {
s.Annotate(fmt.Sprintf(format, addiations...))
return s
}
func toTraceAttribute(key string, value interface{}) (attr trace.Attribute, err error) {
switch value := value.(type) {
case bool:
return trace.BoolAttribute(key, value), nil
case string:
return trace.StringAttribute(key, value), nil
}
if valueInt, err := convertToInt64(value); err == nil {
return trace.Int64Attribute(key, valueInt), nil
}
return attr, errors.ThrowInternal(nil, "TRACE-jlq3s", "Attribute is not of type bool, string or int64")
}
func convertToInt64(value interface{}) (int64, error) {
valueString := fmt.Sprintf("%v", value)
return strconv.ParseInt(valueString, 10, 64)
code, msg, id, _ := grpc_errs.ExtractCaosError(err)
s.span.SetAttributes(label.Uint32("grpc_code", uint32(code)), label.String("grpc_msg", msg), label.String("error_id", id))
}

View File

@@ -4,18 +4,18 @@ import (
"context"
"net/http"
"go.opencensus.io/trace"
api_trace "go.opentelemetry.io/otel/api/trace"
sdk_trace "go.opentelemetry.io/otel/sdk/trace"
)
type Tracer interface {
Start() error
NewSpan(ctx context.Context, caller string) (context.Context, *Span)
NewClientSpan(ctx context.Context, caller string) (context.Context, *Span)
NewServerSpan(ctx context.Context, caller string) (context.Context, *Span)
NewClientInterceptorSpan(ctx context.Context, name string) (context.Context, *Span)
NewServerInterceptorSpan(ctx context.Context, name string) (context.Context, *Span)
NewSpanHTTP(r *http.Request, caller string) (*http.Request, *Span)
Sampler() trace.Sampler
Sampler() sdk_trace.Sampler
}
type Config interface {
@@ -24,9 +24,9 @@ type Config interface {
var T Tracer
func Sampler() trace.Sampler {
func Sampler() sdk_trace.Sampler {
if T == nil {
return trace.NeverSample()
return sdk_trace.NeverSample()
}
return T.Sampler()
}
@@ -79,3 +79,7 @@ func NewSpanHTTP(r *http.Request) (*http.Request, *Span) {
}
return T.NewSpanHTTP(r, GetCaller())
}
func TraceIDFromCtx(ctx context.Context) string {
return api_trace.SpanFromContext(ctx).SpanContext().TraceID.String()
}