Skip to content

Commit

Permalink
Merge pull request #2152 from tonistiigi/otel
Browse files Browse the repository at this point in the history
Add opentelemetry support
  • Loading branch information
AkihiroSuda committed Jun 16, 2021
2 parents 2ccc0e2 + b10f259 commit 0164c06
Show file tree
Hide file tree
Showing 688 changed files with 99,444 additions and 35,062 deletions.
18 changes: 12 additions & 6 deletions client/client.go
Expand Up @@ -10,15 +10,16 @@ import (

"github.com/containerd/containerd/defaults"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/buildkit/client/connhelper"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/grpcerrors"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
Expand Down Expand Up @@ -54,8 +55,9 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
needWithInsecure = false
}
if wt, ok := o.(*withTracer); ok {
unary = append(unary, otgrpc.OpenTracingClientInterceptor(wt.tracer, otgrpc.LogPayloads()))
stream = append(stream, otgrpc.OpenTracingStreamClientInterceptor(wt.tracer))
var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
unary = append(unary, otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(wt), otelgrpc.WithPropagators(propagators)))
stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(wt), otelgrpc.WithPropagators(propagators)))
}
if wd, ok := o.(*withDialer); ok {
gopts = append(gopts, grpc.WithContextDialer(wd.dialer))
Expand Down Expand Up @@ -182,12 +184,16 @@ func loadCredentials(opts *withCredentials) (grpc.DialOption, error) {
return grpc.WithTransportCredentials(credentials.NewTLS(cfg)), nil
}

func WithTracer(t opentracing.Tracer) ClientOpt {
func WithTracer(t trace.Tracer) ClientOpt {
return &withTracer{t}
}

type withTracer struct {
tracer opentracing.Tracer
tracer trace.Tracer
}

func (wt *withTracer) Tracer(instrumentationName string, opts ...trace.TracerOption) trace.Tracer {
return wt.tracer
}

func resolveDialer(address string) (func(context.Context, string) (net.Conn, error), error) {
Expand Down
6 changes: 3 additions & 3 deletions client/solve.go
Expand Up @@ -22,10 +22,10 @@ import (
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/entitlements"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
fstypes "github.com/tonistiigi/fsutil/types"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -93,8 +93,8 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
statusContext, cancelStatus := context.WithCancel(context.Background())
defer cancelStatus()

if span := opentracing.SpanFromContext(ctx); span != nil {
statusContext = opentracing.ContextWithSpan(statusContext, span)
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
statusContext = trace.ContextWithSpan(statusContext, span)
}

s := opt.SharedSession
Expand Down
4 changes: 2 additions & 2 deletions cmd/buildctl/common/common.go
Expand Up @@ -8,9 +8,9 @@ import (
"time"

"github.com/moby/buildkit/client"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.opentelemetry.io/otel/trace"
)

// ResolveClient resolves a client from CLI args
Expand Down Expand Up @@ -64,7 +64,7 @@ func ResolveClient(c *cli.Context) (*client.Client, error) {

ctx := CommandContext(c)

if span := opentracing.SpanFromContext(ctx); span != nil {
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
opts = append(opts, client.WithTracer(span.Tracer()))
}

Expand Down
59 changes: 17 additions & 42 deletions cmd/buildctl/common/trace.go
Expand Up @@ -2,42 +2,25 @@ package common

import (
"context"
"io"
"os"
"strings"

"github.com/moby/buildkit/util/appcontext"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
jaeger "github.com/uber/jaeger-client-go"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/urfave/cli"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

func getTracer() (opentracing.Tracer, io.Closer) {
if traceAddr := os.Getenv("JAEGER_TRACE"); traceAddr != "" {
tr, err := jaeger.NewUDPTransport(traceAddr, 0)
if err != nil {
panic(err)
}

// metricsFactory := prometheus.New()
return jaeger.NewTracer(
"buildctl",
jaeger.NewConstSampler(true),
jaeger.NewRemoteReporter(tr),
)
}

return opentracing.NoopTracer{}, &nopCloser{}
}

func AttachAppContext(app *cli.App) {
func AttachAppContext(app *cli.App) error {
ctx := appcontext.Context()

tracer, closer := getTracer()
tracer, err := detect.Tracer()
if err != nil {
return err
}

var span opentracing.Span
var span trace.Span

for i, cmd := range app.Commands {
func(before cli.BeforeFunc) {
Expand All @@ -49,10 +32,9 @@ func AttachAppContext(app *cli.App) {
}
}

span = tracer.StartSpan(name)
span.LogFields(log.String("command", strings.Join(os.Args, " ")))

ctx = opentracing.ContextWithSpan(ctx, span)
ctx, span = tracer.Start(ctx, name, trace.WithAttributes(
attribute.Array("command", attribute.ArrayValue(os.Args)),
))

clicontext.App.Metadata["context"] = ctx
return nil
Expand All @@ -62,7 +44,7 @@ func AttachAppContext(app *cli.App) {

app.ExitErrHandler = func(clicontext *cli.Context, err error) {
if span != nil {
ext.Error.Set(span, true)
span.SetStatus(codes.Error, err.Error())
}
cli.HandleExitCoder(err)
}
Expand All @@ -75,20 +57,13 @@ func AttachAppContext(app *cli.App) {
}
}
if span != nil {
span.Finish()
span.End()
}
return closer.Close()
return detect.Shutdown(context.TODO())
}

return nil
}

func CommandContext(c *cli.Context) context.Context {
return c.App.Metadata["context"].(context.Context)
}

type nopCloser struct {
}

func (*nopCloser) Close() error {
return nil
}
38 changes: 27 additions & 11 deletions cmd/buildctl/main.go
Expand Up @@ -13,15 +13,20 @@ import (
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/util/stack"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
"github.com/moby/buildkit/version"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"go.opentelemetry.io/otel"
)

func init() {
apicaps.ExportedProduct = "buildkit"

stack.SetVersionInfo(version.Version, version.Revision)

// do not log tracing errors to stdio
otel.SetErrorHandler(skipErrors{})
}

func main() {
Expand Down Expand Up @@ -98,19 +103,30 @@ func main() {
return nil
}

bccommon.AttachAppContext(app)
if err := bccommon.AttachAppContext(app); err != nil {
handleErr(debugEnabled, err)
}

profiler.Attach(app)

if err := app.Run(os.Args); err != nil {
for _, s := range errdefs.Sources(err) {
s.Print(os.Stderr)
}
if debugEnabled {
fmt.Fprintf(os.Stderr, "error: %+v", stack.Formatter(err))
} else {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
}
os.Exit(1)
handleErr(debugEnabled, app.Run(os.Args))
}

func handleErr(debug bool, err error) {
if err == nil {
return
}
for _, s := range errdefs.Sources(err) {
s.Print(os.Stderr)
}
if debug {
fmt.Fprintf(os.Stderr, "error: %+v", stack.Formatter(err))
} else {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
}
os.Exit(1)
}

type skipErrors struct{}

func (skipErrors) Handle(err error) {}
48 changes: 38 additions & 10 deletions cmd/buildkitd/main.go
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/docker/go-connections/sockets"
"github.com/gofrs/flock"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/moby/buildkit/cache/remotecache"
inlineremotecache "github.com/moby/buildkit/cache/remotecache/inline"
localremotecache "github.com/moby/buildkit/cache/remotecache/local"
Expand All @@ -48,12 +47,18 @@ import (
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/stack"
"github.com/moby/buildkit/util/tracing/detect"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
"github.com/moby/buildkit/version"
"github.com/moby/buildkit/worker"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
Expand All @@ -62,10 +67,16 @@ func init() {
apicaps.ExportedProduct = "buildkit"
stack.SetVersionInfo(version.Version, version.Revision)

// OTEL error handling is broken https://github.com/open-telemetry/opentelemetry-go/pull/1851
// remove this with otel update
otel.SetErrorHandler(skipErrors{})

seed.WithTimeAndRand()
reexec.Init()
}

var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})

type workerInitializerOpt struct {
config *config.Config
configMetaData *toml.MetaData
Expand Down Expand Up @@ -207,8 +218,16 @@ func main() {
return err
}
}
unary := grpc_middleware.ChainUnaryServer(unaryInterceptor(ctx), grpcerrors.UnaryServerInterceptor)
stream := grpc_middleware.ChainStreamServer(otgrpc.OpenTracingStreamServerInterceptor(tracer), grpcerrors.StreamServerInterceptor)

tracer, err := detect.Tracer()
if err != nil {
return err
}

streamTracer := otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(constTracerProvider{tracer: tracer}), otelgrpc.WithPropagators(propagators))

unary := grpc_middleware.ChainUnaryServer(unaryInterceptor(ctx, tracer), grpcerrors.UnaryServerInterceptor)
stream := grpc_middleware.ChainStreamServer(streamTracer, grpcerrors.StreamServerInterceptor)

opts := []grpc.ServerOption{grpc.UnaryInterceptor(unary), grpc.StreamInterceptor(stream)}
server := grpc.NewServer(opts...)
Expand Down Expand Up @@ -282,11 +301,8 @@ func main() {
return err
}

app.After = func(context *cli.Context) error {
if closeTracer != nil {
return closeTracer.Close()
}
return nil
app.After = func(_ *cli.Context) error {
return detect.Shutdown(context.TODO())
}

profiler.Attach(app)
Expand Down Expand Up @@ -525,8 +541,8 @@ func getListener(addr string, uid, gid int, tlsConfig *tls.Config) (net.Listener
}
}

func unaryInterceptor(globalCtx context.Context) grpc.UnaryServerInterceptor {
withTrace := otgrpc.OpenTracingServerInterceptor(tracer, otgrpc.LogPayloads())
func unaryInterceptor(globalCtx context.Context, tracer trace.Tracer) grpc.UnaryServerInterceptor {
withTrace := otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(constTracerProvider{tracer: tracer}), otelgrpc.WithPropagators(propagators))

return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -730,3 +746,15 @@ func getDNSConfig(cfg *config.DNSConfig) *oci.DNSConfig {
}
return dns
}

type constTracerProvider struct {
tracer trace.Tracer
}

func (tp constTracerProvider) Tracer(instrumentationName string, opts ...trace.TracerOption) trace.Tracer {
return tp.tracer
}

type skipErrors struct{}

func (skipErrors) Handle(err error) {}
31 changes: 0 additions & 31 deletions cmd/buildkitd/trace.go

This file was deleted.

0 comments on commit 0164c06

Please sign in to comment.