Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add current tracing context detection and exec propagation #2163

Merged
merged 8 commits into from
Jul 2, 2021
49 changes: 47 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net"
"net/url"
"strings"

"github.com/containerd/containerd/defaults"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -16,9 +17,12 @@ import (
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/tracing/otlptracegrpc"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand All @@ -44,6 +48,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error

var customTracer bool // allows manually setting disabling tracing even if tracer in context
var tracerProvider trace.TracerProvider
var tracerDelegate TracerDelegate

for _, o := range opts {
if _, ok := o.(*withFailFast); ok {
Expand All @@ -60,12 +65,14 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
if wt, ok := o.(*withTracer); ok {
customTracer = true
tracerProvider = wt.tp

}
if wd, ok := o.(*withDialer); ok {
gopts = append(gopts, grpc.WithContextDialer(wd.dialer))
needDialer = false
}
if wt, ok := o.(*withTracerDelegate); ok {
tracerDelegate = wt
}
}

if !customTracer {
Expand All @@ -76,7 +83,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error

if tracerProvider != nil {
var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
unary = append(unary, otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators)))
unary = append(unary, filterInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators))))
stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators)))
}

Expand Down Expand Up @@ -122,12 +129,27 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q . make sure buildkitd is running", address)
}

c := &Client{
conn: conn,
}

if tracerDelegate != nil {
_ = c.setupDelegatedTracing(ctx, tracerDelegate) // ignore error
}

return c, nil
}

func (c *Client) setupDelegatedTracing(ctx context.Context, td TracerDelegate) error {
pd := otlptracegrpc.NewClient(c.conn)
e, err := otlptrace.New(ctx, pd)
if err != nil {
return nil
}
return td.SetSpanExporter(ctx, e)
}

func (c *Client) controlClient() controlapi.ControlClient {
return controlapi.NewControlClient(c.conn)
}
Expand Down Expand Up @@ -208,6 +230,20 @@ type withTracer struct {
tp trace.TracerProvider
}

type TracerDelegate interface {
SetSpanExporter(context.Context, sdktrace.SpanExporter) error
}

func WithTracerDelegate(td TracerDelegate) ClientOpt {
return &withTracerDelegate{
TracerDelegate: td,
}
}

type withTracerDelegate struct {
TracerDelegate
}

func resolveDialer(address string) (func(context.Context, string) (net.Conn, error), error) {
ch, err := connhelper.GetConnectionHelper(address)
if err != nil {
Expand All @@ -219,3 +255,12 @@ func resolveDialer(address string) (func(context.Context, string) (net.Conn, err
// basic dialer
return dialer, nil
}

func filterInterceptor(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if strings.HasSuffix(method, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return invoker(ctx, method, req, reply, cc, opts...)
}
return intercept(ctx, method, req, reply, cc, invoker, opts...)
}
}
10 changes: 10 additions & 0 deletions cmd/buildctl/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -66,6 +67,15 @@ func ResolveClient(c *cli.Context) (*client.Client, error) {

if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
opts = append(opts, client.WithTracerProvider(span.TracerProvider()))

exp, err := detect.Exporter()
if err != nil {
return nil, err
}

if td, ok := exp.(client.TracerDelegate); ok {
opts = append(opts, client.WithTracerDelegate(td))
}
}

if caCert != "" || cert != "" || key != "" {
Expand Down
2 changes: 2 additions & 0 deletions cmd/buildctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/util/stack"
_ "github.com/moby/buildkit/util/tracing/detect/delegated"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
_ "github.com/moby/buildkit/util/tracing/env"
"github.com/moby/buildkit/version"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
Expand Down
58 changes: 57 additions & 1 deletion cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
"github.com/moby/buildkit/util/stack"
"github.com/moby/buildkit/util/tracing/detect"
_ "github.com/moby/buildkit/util/tracing/detect/jaeger"
_ "github.com/moby/buildkit/util/tracing/env"
"github.com/moby/buildkit/util/tracing/transform"
"github.com/moby/buildkit/version"
"github.com/moby/buildkit/worker"
specs "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -58,7 +60,10 @@ import (
"github.com/urfave/cli"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
Expand All @@ -77,6 +82,7 @@ type workerInitializerOpt struct {
config *config.Config
configMetaData *toml.MetaData
sessionManager *session.Manager
traceSocket string
}

type workerInitializer struct {
Expand Down Expand Up @@ -304,7 +310,7 @@ func main() {
profiler.Attach(app)

if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "buildkitd: %s\n", err)
fmt.Fprintf(os.Stderr, "buildkitd: %+v\n", err)
os.Exit(1)
}
}
Expand Down Expand Up @@ -552,6 +558,10 @@ func unaryInterceptor(globalCtx context.Context, tp trace.TracerProvider) grpc.U
}
}()

if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return handler(ctx, req)
}

resp, err = withTrace(ctx, req, info, handler)
if err != nil {
logrus.Errorf("%s returned error: %+v", info.FullMethod, stack.Formatter(err))
Expand Down Expand Up @@ -602,10 +612,25 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
if err != nil {
return nil, err
}

tc, err := detect.Exporter()
if err != nil {
return nil, err
}

var traceSocket string
if tc != nil {
traceSocket = filepath.Join(cfg.Root, "otel-grpc.sock")
if err := runTraceController(traceSocket, tc); err != nil {
return nil, err
}
}

wc, err := newWorkerController(c, workerInitializerOpt{
config: cfg,
configMetaData: md,
sessionManager: sessionManager,
traceSocket: traceSocket,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -635,6 +660,7 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
"registry": registryremotecache.ResolveCacheImporterFunc(sessionManager, w.ContentStore(), resolverFn),
"local": localremotecache.ResolveCacheImporterFunc(sessionManager),
}

return control.NewController(control.Opt{
SessionManager: sessionManager,
WorkerController: wc,
Expand All @@ -643,6 +669,7 @@ func newController(c *cli.Context, cfg *config.Config, md *toml.MetaData) (*cont
ResolveCacheImporterFuncs: remoteCacheImporterFuncs,
CacheKeyStorage: cacheStorage,
Entitlements: cfg.Entitlements,
TraceCollector: tc,
})
}

Expand Down Expand Up @@ -742,3 +769,32 @@ func getDNSConfig(cfg *config.DNSConfig) *oci.DNSConfig {
}
return dns
}

func runTraceController(p string, exp sdktrace.SpanExporter) error {
server := grpc.NewServer()
tracev1.RegisterTraceServiceServer(server, &traceCollector{exporter: exp})
uid := os.Getuid()
l, err := sys.GetLocalListener(p, uid, uid)
if err != nil {
return err
}
if err := os.Chmod(p, 0666); err != nil {
l.Close()
return err
}
go server.Serve(l)
return nil
}

type traceCollector struct {
*tracev1.UnimplementedTraceServiceServer
exporter sdktrace.SpanExporter
}

func (t *traceCollector) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
err := t.exporter.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
return nil, err
}
return &v1.ExportTraceServiceResponse{}, nil
}
2 changes: 1 addition & 1 deletion cmd/buildkitd/main_containerd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func containerdWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([
if cfg.Snapshotter != "" {
snapshotter = cfg.Snapshotter
}
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, ctd.WithTimeout(60*time.Second))
opt, err := containerd.NewWorkerOpt(common.config.Root, cfg.Address, snapshotter, cfg.Namespace, cfg.Labels, dns, nc, common.config.Workers.Containerd.ApparmorProfile, parallelismSem, common.traceSocket, ctd.WithTimeout(60*time.Second))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/buildkitd/main_oci_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func ociWorkerInitializer(c *cli.Context, common workerInitializerOpt) ([]worker
parallelismSem = semaphore.NewWeighted(int64(cfg.MaxParallelism))
}

opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem)
opt, err := runc.NewWorkerOpt(common.config.Root, snFactory, cfg.Rootless, processMode, cfg.Labels, idmapping, nc, dns, cfg.Binary, cfg.ApparmorProfile, parallelismSem, common.traceSocket)
if err != nil {
return nil, err
}
Expand Down
21 changes: 21 additions & 0 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ import (
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/throttle"
"github.com/moby/buildkit/util/tracing/transform"
"github.com/moby/buildkit/worker"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Opt struct {
Expand All @@ -35,9 +41,12 @@ type Opt struct {
ResolveCacheExporterFuncs map[string]remotecache.ResolveCacheExporterFunc
ResolveCacheImporterFuncs map[string]remotecache.ResolveCacheImporterFunc
Entitlements []string
TraceCollector sdktrace.SpanExporter
}

type Controller struct { // TODO: ControlService
*tracev1.UnimplementedTraceServiceServer

buildCount int64
opt Opt
solver *llbsolver.Solver
Expand Down Expand Up @@ -75,6 +84,7 @@ func NewController(opt Opt) (*Controller, error) {
func (c *Controller) Register(server *grpc.Server) error {
controlapi.RegisterControlServer(server, c)
c.gatewayForwarder.Register(server)
tracev1.RegisterTraceServiceServer(server, c)
return nil
}

Expand Down Expand Up @@ -184,6 +194,17 @@ func (c *Controller) Prune(req *controlapi.PruneRequest, stream controlapi.Contr
return eg2.Wait()
}

func (c *Controller) Export(ctx context.Context, req *tracev1.ExportTraceServiceRequest) (*tracev1.ExportTraceServiceResponse, error) {
if c.opt.TraceCollector == nil {
return nil, status.Errorf(codes.Unavailable, "trace collector not configured")
}
err := c.opt.TraceCollector.ExportSpans(ctx, transform.Spans(req.GetResourceSpans()))
if err != nil {
return nil, err
}
return &v1.ExportTraceServiceResponse{}, nil
}

func translateLegacySolveRequest(req *controlapi.SolveRequest) error {
// translates ExportRef and ExportAttrs to new Exports (v0.4.0)
if legacyExportRef := req.Cache.ExportRefDeprecated; legacyExportRef != "" {
Expand Down
6 changes: 4 additions & 2 deletions executor/containerdexecutor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ type containerdExecutor struct {
running map[string]chan error
mu sync.Mutex
apparmorProfile string
traceSocket string
}

// New creates a new executor backed by connection to containerd API
func New(client *containerd.Client, root, cgroup string, networkProviders map[pb.NetMode]network.Provider, dnsConfig *oci.DNSConfig, apparmorProfile string) executor.Executor {
func New(client *containerd.Client, root, cgroup string, networkProviders map[pb.NetMode]network.Provider, dnsConfig *oci.DNSConfig, apparmorProfile string, traceSocket string) executor.Executor {
// clean up old hosts/resolv.conf file. ignore errors
os.RemoveAll(filepath.Join(root, "hosts"))
os.RemoveAll(filepath.Join(root, "resolv.conf"))
Expand All @@ -54,6 +55,7 @@ func New(client *containerd.Client, root, cgroup string, networkProviders map[pb
dnsConfig: dnsConfig,
running: make(map[string]chan error),
apparmorProfile: apparmorProfile,
traceSocket: traceSocket,
}
}

Expand Down Expand Up @@ -170,7 +172,7 @@ func (w *containerdExecutor) Run(ctx context.Context, id string, root executor.M
opts = append(opts, containerdoci.WithCgroup(cgroupsPath))
}
processMode := oci.ProcessSandbox // FIXME(AkihiroSuda)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, w.apparmorProfile, opts...)
spec, cleanup, err := oci.GenerateSpec(ctx, meta, mounts, id, resolvConf, hostsFile, namespace, processMode, nil, w.apparmorProfile, w.traceSocket, opts...)
if err != nil {
return err
}
Expand Down