diff --git a/CHANGELOG.md b/CHANGELOG.md index daa5584bed7..0a5b8abdfc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added +- The `BatchSpanProcessor` from `go.opentelemetry.io/otel/sdk/trace` logs the spans dropped in a batch every export with verbosity `1`. (#3289) - The `WithView` `Option` is added to the `go.opentelemetry.io/otel/sdk/metric` package. This option is used to configure the view(s) a `MeterProvider` will use for all `Reader`s that are registered with it. (#3387) - Add Instrumentation Scope and Version as info metric and label in Prometheus exporter. diff --git a/sdk/go.mod b/sdk/go.mod index 022ad8ce1f2..a902115d992 100644 --- a/sdk/go.mod +++ b/sdk/go.mod @@ -6,6 +6,7 @@ replace go.opentelemetry.io/otel => ../ require ( github.com/go-logr/logr v1.2.3 + github.com/go-logr/stdr v1.2.2 github.com/google/go-cmp v0.5.9 github.com/stretchr/testify v1.8.1 go.opentelemetry.io/otel v1.11.1 @@ -15,7 +16,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index a2d7db49001..127dad79dd0 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -75,8 +75,9 @@ type batchSpanProcessor struct { e SpanExporter o BatchSpanProcessorOptions - queue chan ReadOnlySpan - dropped uint32 + queue chan ReadOnlySpan + dropped uint32 + reportedDropped uint32 batch []ReadOnlySpan batchMutex sync.Mutex @@ -265,7 +266,15 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { } if l := len(bsp.batch); l > 0 { - global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) + global.Debug("exporting spans", "count", len(bsp.batch)) + + dropped := atomic.LoadUint32(&bsp.dropped) + reportedDropped := atomic.SwapUint32(&bsp.reportedDropped, dropped) + if dropped > reportedDropped { + droppedThisBatch := dropped - reportedDropped + global.Info("dropped spans", "total", dropped, "this_batch", droppedThisBatch) + } + err := bsp.e.ExportSpans(ctx, bsp.batch) // A new batch is always created after exporting, even if the batch failed to be exported. diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index a033b6a0082..480d47b3083 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -19,18 +19,23 @@ import ( "encoding/binary" "errors" "fmt" + "log" "os" + "regexp" "sync" + "sync/atomic" "testing" "time" - ottest "go.opentelemetry.io/otel/internal/internaltest" - + "github.com/go-logr/logr" "github.com/go-logr/logr/funcr" + "github.com/go-logr/stdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" + ottest "go.opentelemetry.io/otel/internal/internaltest" "go.opentelemetry.io/otel/sdk/internal/env" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" @@ -313,19 +318,8 @@ func TestNewBatchSpanProcessorWithEnvOptions(t *testing.T) { } } -type stuckExporter struct { - testBatchExporter -} - -// ExportSpans waits for ctx to expire and returns that error. -func (e *stuckExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error { - <-ctx.Done() - e.err = ctx.Err() - return ctx.Err() -} - func TestBatchSpanProcessorExportTimeout(t *testing.T) { - exp := new(stuckExporter) + exp := newBlockingExporter() bsp := sdktrace.NewBatchSpanProcessor( exp, // Set a non-zero export timeout so a deadline is set. @@ -474,6 +468,99 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) { assert.NoError(t, err) } +// blockingExporter will block on ExportSpans until one of the following happens: +// a) The context is Canceled. +// b) unblock() is called. +// c) The exporter is ShutDown(). +type blockingExporter struct { + testBatchExporter + block chan struct{} + close sync.Once +} + +func newBlockingExporter() *blockingExporter { + return &blockingExporter{block: make(chan struct{})} +} + +func (e *blockingExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error { + select { + case <-e.block: + case <-ctx.Done(): + e.err = ctx.Err() + return ctx.Err() + } + return nil +} +func (e *blockingExporter) Shutdown(_ context.Context) error { + e.close.Do(func() { + close(e.block) + }) + return nil +} +func (e *blockingExporter) unblock() { + select { + case e.block <- struct{}{}: + default: + } +} + +type matchingLogger struct { + // This can be an atomic.Bool when we drop 1.18 support + _found atomic.Value + regex *regexp.Regexp +} + +func (l *matchingLogger) Logger() logr.Logger { + l._found.Store(false) + return funcr.New(func(prefix, args string) { + if l.regex.MatchString(args) { + l._found.Store(true) + } + }, funcr.Options{Verbosity: 10}) +} + +func (l *matchingLogger) found() bool { + return l._found.Load().(bool) +} + +func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { + tLog := &matchingLogger{regex: regexp.MustCompile(`"msg"="dropped spans" "total"=\d+ "this_batch"=\d+`)} + otel.SetLogger(tLog.Logger()) + + te := newBlockingExporter() + tp := basicTracerProvider(t) + option := testOption{ + name: "default BatchSpanProcessorOptions", + o: []sdktrace.BatchSpanProcessorOption{ + sdktrace.WithMaxQueueSize(0), + sdktrace.WithMaxExportBatchSize(1), + }, + genNumSpans: 100, + } + ssp := sdktrace.NewBatchSpanProcessor(te, option.o...) + if ssp == nil { + t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) + } + tp.RegisterSpanProcessor(ssp) + tr := tp.Tracer("BatchSpanProcessorWithOption") + + assert.Eventually(t, func() bool { + generateSpan(t, tr, option) + te.unblock() + + return tLog.found() + }, time.Second, time.Millisecond) + + // We shut down the exporter to clear any blocked ExportSpans(). + // If we don't shut down the exporter first the SpanProcessor will block waiting for Exports to complete. + _ = te.Shutdown(context.Background()) + _ = ssp.Shutdown(context.Background()) + + // Reset the global logger so other tests are not impacted + logger := stdr.New(log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile)) + otel.SetLogger(logger) +} + func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) { te := testBatchExporter{ errors: []error{errors.New("fail to export")},