Skip to content

Commit

Permalink
Merge pull request #2163 from tonistiigi/detect-otel-env
Browse files Browse the repository at this point in the history
  • Loading branch information
AkihiroSuda committed Jul 2, 2021
2 parents 9cf28dc + 1d0597e commit 1a7543a
Show file tree
Hide file tree
Showing 34 changed files with 1,563 additions and 45 deletions.
49 changes: 47 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net"
"net/url"
"strings"

"github.com/containerd/containerd/defaults"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -16,9 +17,12 @@ import (
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/tracing/otlptracegrpc"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand All @@ -44,6 +48,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error

var customTracer bool // allows manually setting disabling tracing even if tracer in context
var tracerProvider trace.TracerProvider
var tracerDelegate TracerDelegate

for _, o := range opts {
if _, ok := o.(*withFailFast); ok {
Expand All @@ -60,12 +65,14 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
if wt, ok := o.(*withTracer); ok {
customTracer = true
tracerProvider = wt.tp

}
if wd, ok := o.(*withDialer); ok {
gopts = append(gopts, grpc.WithContextDialer(wd.dialer))
needDialer = false
}
if wt, ok := o.(*withTracerDelegate); ok {
tracerDelegate = wt
}
}

if !customTracer {
Expand All @@ -76,7 +83,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error

if tracerProvider != nil {
var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
unary = append(unary, otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators)))
unary = append(unary, filterInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators))))
stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators)))
}

Expand Down Expand Up @@ -122,12 +129,27 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q . make sure buildkitd is running", address)
}

c := &Client{
conn: conn,
}

if tracerDelegate != nil {
_ = c.setupDelegatedTracing(ctx, tracerDelegate) // ignore error
}

return c, nil
}

func (c *Client) setupDelegatedTracing(ctx context.Context, td TracerDelegate) error {
pd := otlptracegrpc.NewClient(c.conn)
e, err := otlptrace.New(ctx, pd)
if err != nil {
return nil
}
return td.SetSpanExporter(ctx, e)
}

func (c *Client) controlClient() controlapi.ControlClient {
return controlapi.NewControlClient(c.conn)
}
Expand Down Expand Up @@ -208,6 +230,20 @@ type withTracer struct {
tp trace.TracerProvider
}

type TracerDelegate interface {
SetSpanExporter(context.Context, sdktrace.SpanExporter) error
}

func WithTracerDelegate(td TracerDelegate) ClientOpt {
return &withTracerDelegate{
TracerDelegate: td,
}
}

type withTracerDelegate struct {
TracerDelegate
}

func resolveDialer(address string) (func(context.Context, string) (net.Conn, error), error) {
ch, err := connhelper.GetConnectionHelper(address)
if err != nil {
Expand All @@ -219,3 +255,12 @@ func resolveDialer(address string) (func(context.Context, string) (net.Conn, err
// basic dialer
return dialer, nil
}

func filterInterceptor(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if strings.HasSuffix(method, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return invoker(ctx, method, req, reply, cc, opts...)
}
return intercept(ctx, method, req, reply, cc, invoker, opts...)
}
}
10 changes: 10 additions & 0 deletions cmd/buildctl/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -66,6 +67,15 @@ func ResolveClient(c *cli.Context) (*client.Client, error) {

if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
opts = append(opts, client.WithTracerProvider(span.TracerProvider()))

exp, err := detect.Exporter()
if err != nil {
return nil, err
}

if td, ok := exp.(client.TracerDelegate); ok {
opts = append(opts, client.WithTracerDelegate(td))
}
}

if caCert != "" || cert != "" || key != "" {
Expand Down
2 changes: 2 additions & 0 deletions cmd/buildctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/util/stack"
_ "github.com/moby/buildkit/util/tracing/detect/delegated"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
_ "github.com/moby/buildkit/util/tracing/env"
"github.com/moby/buildkit/version"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
Expand Down
58 changes: 57 additions & 1 deletion cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
"github.com/moby/buildkit/util/stack"
"github.com/moby/buildkit/util/tracing/detect"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
_ "github.com/moby/buildkit/util/tracing/env"
"github.com/moby/buildkit/util/tracing/transform"
"github.com/moby/buildkit/version"
"github.com/moby/buildkit/worker"
specs "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -58,7 +60,10 @@ import (
"github.com/urfave/cli"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
Expand All @@ -77,6 +82,7 @@ type workerInitializerOpt struct {
config *config.Config
configMetaData *toml.MetaData
sessionManager *session.Manager
traceSocket string
}

type workerInitializer struct {
Expand Down Expand Up @@ -304,7 +310,7 @@ func main() {
profiler.Attach(app)

if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "buildkitd: %s\n", err)
fmt.Fprintf(os.Stderr, "buildkitd: %+v\n", err)
os.Exit(1)
}
}
Expand Down Expand Up @@ -552,6 +558,10 @@ func unaryInterceptor(globalCtx context.Context, tp trace.TracerProvider) grpc.U
}
}()

if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return handler(ctx, req)
}

resp, err = withTrace(ctx, req, info, handler)
if err != nil {
logrus.Errorf("%s returned error: %+v", info.FullMethod, stack.Formatter(err))
Expand Down Expand Up @@ -602,10 +612,25 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
if err != nil {
return nil, err
}

tc, err := detect.Exporter()
if err != nil {
return nil, err
}

var traceSocket string
if tc != nil {
traceSocket = filepath.Join(cfg.Root, "otel-grpc.sock")
if err := runTraceController(traceSocket, tc); err != nil {
return nil, err
}
}

wc, err := newWorkerController(c, workerInitializerOpt{
config: cfg,
configMetaData: md,
sessionManager: sessionManager,
traceSocket: traceSocket,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -635,6 +660,7 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
"registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn),
"local": localremotecache.ResolveCacheImporterFunc(sessionManager),
}

return control.NewController(control.Opt{
SessionManager: sessionManager,
WorkerController: wc,
Expand All @@ -643,6 +669,7 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
ResolveCacheImporterFuncs: remoteCacheImporterFuncs,
CacheKeyStorage: cacheStorage,
Entitlements: cfg.Entitlements,
TraceCollector: tc,
})
}

Expand Down Expand Up @@ -742,3 +769,32 @@ func getDNSConfig(cfg *config.DNSConfig) *oci.DNSConfig {
}
return dns
}

func runTraceController(p string, exp sdktrace.SpanExporter) error {
server := grpc.NewServer()
tracev1.RegisterTraceServiceServer(server, &traceCollector{exporter: exp})
uid := os.Getuid()
l, err := sys.GetLocalListener(p, uid, uid)
if err != nil {
return err
}
if err := os.Chmod(p, 0666); err != nil {
l.Close()
return err
}
go server.Serve(l)
return nil
}

type traceCollector struct {
*tracev1.UnimplementedTraceServiceServer
exporter sdktrace.SpanExporter
}

func (t *traceCollector) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
return nil, err
}
return &v1.ExportTraceServiceResponse{}, nil
}
2 changes: 1 addition & 1 deletion cmd/buildkitd/main_containerd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
if cfg.Snapshotter != "" {
snapshotter = cfg.Snapshotter
}
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, ctd.WithTimeout(60*time.Second))
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, common.traceSocket, ctd.WithTimeout(60*time.Second))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/buildkitd/main_oci_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism))
}

opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem)
opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem, common.traceSocket)
if err != nil {
return nil, err
}
Expand Down
21 changes: 21 additions & 0 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ import (
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/throttle"
"github.com/moby/buildkit/util/tracing/transform"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Opt struct {
Expand All @@ -35,9 +41,12 @@ type Opt struct {
ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc
ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
Entitlements []string
TraceCollector sdktrace.SpanExporter
}

type Controller struct { // TODO: ControlService
*tracev1.UnimplementedTraceServiceServer

buildCount int64
opt Opt
solver *llbsolver.Solver
Expand Down Expand Up @@ -75,6 +84,7 @@ func NewController(opt Opt) (*Controller, error) {
func (c *Controller) Register(server *grpc.Server) error {
controlapi.RegisterControlServer(server, c)
c.gatewayForwarder.Register(server)
tracev1.RegisterTraceServiceServer(server, c)
return nil
}

Expand Down Expand Up @@ -184,6 +194,17 @@ func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Contr
return eg2.Wait()
}

func (c *Controller) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
if c.opt.TraceCollector == nil {
return nil, status.Errorf(codes.Unavailable, "trace collector not configured")
}
err := c.opt.TraceCollector.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
return nil, err
}
return &v1.ExportTraceServiceResponse{}, nil
}

func translateLegacySolveRequest(req *controlapi.SolveRequest) error {
// translates ExportRef and ExportAttrs to new Exports (v0.4.0)
if legacyExportRef := req.Cache.ExportRefDeprecated; legacyExportRef != "" {
Expand Down
6 changes: 4 additions & 2 deletions executor/containerdexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ type containerdExecutor struct {
running map[string]chan error
mu sync.Mutex
apparmorProfile string
traceSocket string
}

// New creates a new executor backed by connection to containerd API
func New(client *containerd.Client, root, cgroup string, networkProviders map[pb.NetMode]network.Provider, dnsConfig *oci.DNSConfig, apparmorProfile string) executor.Executor {
func New(client *containerd.Client, root, cgroup string, networkProviders map[pb.NetMode]network.Provider, dnsConfig *oci.DNSConfig, apparmorProfile string, traceSocket string) executor.Executor {
// clean up old hosts/resolv.conf file. ignore errors
os.RemoveAll(filepath.Join(root, "hosts"))
os.RemoveAll(filepath.Join(root, "resolv.conf"))
Expand All @@ -54,6 +55,7 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb
dnsConfig: dnsConfig,
running: make(map[string]chan error),
apparmorProfile: apparmorProfile,
traceSocket: traceSocket,
}
}

Expand Down Expand Up @@ -170,7 +172,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M
opts = append(opts, containerdoci.WithCgroup(cgroupsPath))
}
processMode := oci.ProcessSandbox // FIXME(AkihiroSuda)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, w.apparmorProfile, opts...)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, w.apparmorProfile, w.traceSocket, opts...)
if err != nil {
return err
}
Expand Down

0 comments on commit 1a7543a

Please sign in to comment.