Skip to content

Commit

Permalink
client: pass delegated exporter as parameter
Browse files Browse the repository at this point in the history
Avoid client package having dependency on global detect package.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
  • Loading branch information
tonistiigi committed Jun 21, 2021
1 parent 0234ab2 commit 51128c1
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 32 deletions.
41 changes: 23 additions & 18 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"net"
"net/url"
"strings"
Expand All @@ -18,7 +17,6 @@ import (
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/moby/buildkit/util/tracing/otlptracegrpc"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand Down Expand Up @@ -50,6 +48,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error

var customTracer bool // allows manually setting disabling tracing even if tracer in context
var tracerProvider trace.TracerProvider
var tracerDelegate TracerDelegate

for _, o := range opts {
if _, ok := o.(*withFailFast); ok {
Expand All @@ -71,6 +70,9 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
gopts = append(gopts, grpc.WithContextDialer(wd.dialer))
needDialer = false
}
if wt, ok := o.(*withTracerDelegate); ok {
tracerDelegate = wt
}
}

if !customTracer {
Expand Down Expand Up @@ -132,31 +134,20 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
conn: conn,
}

_ = c.setupDelegatedTracing(ctx) // ignore error
if tracerDelegate != nil {
_ = c.setupDelegatedTracing(ctx, tracerDelegate) // ignore error
}

return c, nil
}

func (c *Client) setupDelegatedTracing(ctx context.Context) error {
exp, err := detect.Exporter()
if err != nil {
return err
}

log.Printf("exporter %v %T", exp, exp)
del, ok := exp.(interface {
SetDelegate(context.Context, sdktrace.SpanExporter) error
})
if !ok {
return nil
}

func (c *Client) setupDelegatedTracing(ctx context.Context, td TracerDelegate) error {
pd := otlptracegrpc.NewClient(c.conn)
e, err := otlptrace.New(ctx, pd)
if err != nil {
return nil
}
return del.SetDelegate(ctx, e)
return td.SetSpanExporter(ctx, e)
}

func (c *Client) controlClient() controlapi.ControlClient {
Expand Down Expand Up @@ -239,6 +230,20 @@ type withTracer struct {
tp trace.TracerProvider
}

type TracerDelegate interface {
SetSpanExporter(context.Context, sdktrace.SpanExporter) error
}

func WithTracerDelegate(td TracerDelegate) ClientOpt {
return &withTracerDelegate{
TracerDelegate: td,
}
}

type withTracerDelegate struct {
TracerDelegate
}

func resolveDialer(address string) (func(context.Context, string) (net.Conn, error), error) {
ch, err := connhelper.GetConnectionHelper(address)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions cmd/buildctl/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -66,6 +67,15 @@ func ResolveClient(c *cli.Context) (*client.Client, error) {

if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
opts = append(opts, client.WithTracerProvider(span.TracerProvider()))

exp, err := detect.Exporter()
if err != nil {
return nil, err
}

if td, ok := exp.(client.TracerDelegate); ok {
opts = append(opts, client.WithTracerDelegate(td))
}
}

if caCert != "" || cert != "" || key != "" {
Expand Down
35 changes: 21 additions & 14 deletions util/tracing/detect/delegated/delegated.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

const maxBuffer = 128
const maxBuffer = 256

var exp = &Exporter{}

Expand All @@ -19,9 +19,9 @@ func init() {
}

type Exporter struct {
mu sync.Mutex
delegate sdktrace.SpanExporter
buffer []sdktrace.ReadOnlySpan
mu sync.Mutex
exporters []sdktrace.SpanExporter
buffer []sdktrace.ReadOnlySpan
}

var _ sdktrace.SpanExporter = &Exporter{}
Expand All @@ -30,8 +30,14 @@ func (e *Exporter) ExportSpans(ctx context.Context, ss []sdktrace.ReadOnlySpan)
e.mu.Lock()
defer e.mu.Unlock()

if e.delegate != nil {
return e.delegate.ExportSpans(ctx, ss)
var err error
for _, e := range e.exporters {
if err1 := e.ExportSpans(ctx, ss); err1 != nil {
err = err1
}
}
if err != nil {
return err
}

if len(e.buffer) > maxBuffer {
Expand All @@ -46,23 +52,24 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()

if e.delegate != nil {
return e.delegate.Shutdown(ctx)
var err error
for _, e := range e.exporters {
if err1 := e.Shutdown(ctx); err1 != nil {
err = err1
}
}

return nil
return err
}

func (e *Exporter) SetDelegate(ctx context.Context, del sdktrace.SpanExporter) error {
func (e *Exporter) SetSpanExporter(ctx context.Context, exp sdktrace.SpanExporter) error {
e.mu.Lock()
defer e.mu.Unlock()

e.delegate = del
e.exporters = append(e.exporters, exp)

if len(e.buffer) > 0 {
err := e.delegate.ExportSpans(ctx, e.buffer)
e.buffer = nil
return err
return exp.ExportSpans(ctx, e.buffer)
}
return nil
}

0 comments on commit 51128c1

Please sign in to comment.