Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add warning log on dropped spans in batch span processor #3289

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Added

- The `BatchSpanProcessor` from `go.opentelemetry.io/otel/sdk/trace` logs the spans dropped in a batch every export with verbosity `1`. (#3289)
- The `WithView` `Option` is added to the `go.opentelemetry.io/otel/sdk/metric` package.
This option is used to configure the view(s) a `MeterProvider` will use for all `Reader`s that are registered with it. (#3387)
- Add Instrumentation Scope and Version as info metric and label in Prometheus exporter.
Expand Down
2 changes: 1 addition & 1 deletion sdk/go.mod
Expand Up @@ -6,6 +6,7 @@ replace go.opentelemetry.io/otel => ../

require (
github.com/go-logr/logr v1.2.3
github.com/go-logr/stdr v1.2.2
github.com/google/go-cmp v0.5.9
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/otel v1.11.1
Expand All @@ -15,7 +16,6 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
15 changes: 12 additions & 3 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -75,8 +75,9 @@ type batchSpanProcessor struct {
e SpanExporter
o BatchSpanProcessorOptions

queue chan ReadOnlySpan
dropped uint32
queue chan ReadOnlySpan
dropped uint32
reportedDropped uint32

batch []ReadOnlySpan
batchMutex sync.Mutex
Expand Down Expand Up @@ -265,7 +266,15 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
}

if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
global.Debug("exporting spans", "count", len(bsp.batch))

dropped := atomic.LoadUint32(&bsp.dropped)
reportedDropped := atomic.SwapUint32(&bsp.reportedDropped, dropped)
if dropped > reportedDropped {
droppedThisBatch := dropped - reportedDropped
global.Info("dropped spans", "total", dropped, "this_batch", droppedThisBatch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not call global.Warn?

Suggested change
global.Info("dropped spans", "total", dropped, "this_batch", droppedThisBatch)
global.Warn("dropped spans", "total", dropped, "this_batch", droppedThisBatch)

}

err := bsp.e.ExportSpans(ctx, bsp.batch)

// A new batch is always created after exporting, even if the batch failed to be exported.
Expand Down
115 changes: 101 additions & 14 deletions sdk/trace/batch_span_processor_test.go
Expand Up @@ -19,18 +19,23 @@ import (
"encoding/binary"
"errors"
"fmt"
"log"
"os"
"regexp"
"sync"
"sync/atomic"
"testing"
"time"

ottest "go.opentelemetry.io/otel/internal/internaltest"

"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
"github.com/go-logr/stdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
ottest "go.opentelemetry.io/otel/internal/internaltest"
"go.opentelemetry.io/otel/sdk/internal/env"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
Expand Down Expand Up @@ -313,19 +318,8 @@ func TestNewBatchSpanProcessorWithEnvOptions(t *testing.T) {
}
}

type stuckExporter struct {
testBatchExporter
}

// ExportSpans waits for ctx to expire and returns that error.
func (e *stuckExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error {
<-ctx.Done()
e.err = ctx.Err()
return ctx.Err()
}

func TestBatchSpanProcessorExportTimeout(t *testing.T) {
exp := new(stuckExporter)
exp := newBlockingExporter()
bsp := sdktrace.NewBatchSpanProcessor(
exp,
// Set a non-zero export timeout so a deadline is set.
Expand Down Expand Up @@ -474,6 +468,99 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
assert.NoError(t, err)
}

// blockingExporter will block on ExportSpans until one of the following happens:
// a) The context is Canceled.
// b) unblock() is called.
// c) The exporter is ShutDown().
type blockingExporter struct {
testBatchExporter
block chan struct{}
close sync.Once
}

func newBlockingExporter() *blockingExporter {
return &blockingExporter{block: make(chan struct{})}
}

func (e *blockingExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error {
select {
case <-e.block:
case <-ctx.Done():
e.err = ctx.Err()
return ctx.Err()
}
return nil
}
func (e *blockingExporter) Shutdown(_ context.Context) error {
e.close.Do(func() {
close(e.block)
})
return nil
}
func (e *blockingExporter) unblock() {
select {
case e.block <- struct{}{}:
default:
}
}

type matchingLogger struct {
// This can be an atomic.Bool when we drop 1.18 support
_found atomic.Value
regex *regexp.Regexp
}

func (l *matchingLogger) Logger() logr.Logger {
l._found.Store(false)
return funcr.New(func(prefix, args string) {
if l.regex.MatchString(args) {
l._found.Store(true)
}
}, funcr.Options{Verbosity: 10})
}

func (l *matchingLogger) found() bool {
return l._found.Load().(bool)
}

func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) {
tLog := &matchingLogger{regex: regexp.MustCompile(`"msg"="dropped spans" "total"=\d+ "this_batch"=\d+`)}
otel.SetLogger(tLog.Logger())

te := newBlockingExporter()
tp := basicTracerProvider(t)
option := testOption{
name: "default BatchSpanProcessorOptions",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(0),
sdktrace.WithMaxExportBatchSize(1),
},
genNumSpans: 100,
}
ssp := sdktrace.NewBatchSpanProcessor(te, option.o...)
if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOption")

assert.Eventually(t, func() bool {
generateSpan(t, tr, option)
te.unblock()

return tLog.found()
}, time.Second, time.Millisecond)

// We shut down the exporter to clear any blocked ExportSpans().
// If we don't shut down the exporter first the SpanProcessor will block waiting for Exports to complete.
_ = te.Shutdown(context.Background())
_ = ssp.Shutdown(context.Background())

// Reset the global logger so other tests are not impacted
logger := stdr.New(log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile))
otel.SetLogger(logger)
}

func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) {
te := testBatchExporter{
errors: []error{errors.New("fail to export")},
Expand Down