Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add warning log on dropped spans in batch span processor #3289

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
A non-default registerer can be used by passing the `WithRegisterer` option. (#3239)
- Added the `WithAggregationSelector` option to the `go.opentelemetry.io/otel/exporters/prometheus` package to change the default `AggregationSelector` used. (#3341)
- The Prometheus exporter in `go.opentelemetry.io/otel/exporters/prometheus` converts the `Resource` associated with metric exports into a `target_info` metric. (#3285)
- Batch span processor will now keep track of known dropped span count and log a warning during batch export on newly dropped spans. (#3289)
prodion23 marked this conversation as resolved.
Show resolved Hide resolved

### Changed

Expand Down
7 changes: 7 additions & 0 deletions internal/global/internal_logging.go
Expand Up @@ -48,6 +48,13 @@ func Info(msg string, keysAndValues ...interface{}) {
globalLogger.V(1).Info(msg, keysAndValues...)
}

// Warn prints messages about potential issues with the API or SDK.
func Warn(msg string, keysAndValues ...interface{}) {
globalLoggerLock.RLock()
defer globalLoggerLock.RUnlock()
globalLogger.V(0).Info(msg, keysAndValues...)
prodion23 marked this conversation as resolved.
Show resolved Hide resolved
}

// Error prints messages about exceptional states of the API or SDK.
func Error(err error, msg string, keysAndValues ...interface{}) {
globalLoggerLock.RLock()
Expand Down
14 changes: 11 additions & 3 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -75,8 +75,9 @@ type batchSpanProcessor struct {
e SpanExporter
o BatchSpanProcessorOptions

queue chan ReadOnlySpan
dropped uint32
queue chan ReadOnlySpan
dropped uint32
knownDropped uint32
prodion23 marked this conversation as resolved.
Show resolved Hide resolved

batch []ReadOnlySpan
batchMutex sync.Mutex
Expand Down Expand Up @@ -265,7 +266,14 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
}

if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
global.Debug("exporting spans", "count", len(bsp.batch))

dropped := atomic.LoadUint32(&bsp.dropped)
knownDropped := atomic.SwapUint32(&bsp.knownDropped, dropped)
if dropped > knownDropped {
global.Warn("dropped spans", "total_dropped", dropped, "known_dropped", knownDropped)
prodion23 marked this conversation as resolved.
Show resolved Hide resolved
}

err := bsp.e.ExportSpans(ctx, bsp.batch)

// A new batch is always created after exporting, even if the batch failed to be exported.
Expand Down
52 changes: 52 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Expand Up @@ -19,7 +19,11 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/go-logr/logr"
"github.com/go-logr/logr/testr"
"go.opentelemetry.io/otel"
"os"
"regexp"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -49,6 +53,16 @@ type testBatchExporter struct {
err error
}

type logCounter struct {
logr.LogSink
logs []string
}

func (l *logCounter) Info(level int, msg string, keysAndValues ...interface{}) {
l.logs = append(l.logs, fmt.Sprint(msg, keysAndValues))
l.LogSink.Info(level, msg, keysAndValues...)
}

func (t *testBatchExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down Expand Up @@ -350,6 +364,12 @@ func createAndRegisterBatchSP(option testOption, te *testBatchExporter) sdktrace
return sdktrace.NewBatchSpanProcessor(te, options...)
}

func createAndRegisterBatchSPNonBlocking(option testOption, te *testBatchExporter) sdktrace.SpanProcessor {
// Blocking span processor should be preferred to avoid flaky test
// To test dropped spans warn logging the blocking span processor bypasses drop logic
return sdktrace.NewBatchSpanProcessor(te, option.o...)
}

prodion23 marked this conversation as resolved.
Show resolved Hide resolved
func generateSpan(t *testing.T, tr trace.Tracer, option testOption) {
sc := getSpanContext()

Expand Down Expand Up @@ -474,6 +494,38 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
assert.NoError(t, err)
}

func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) {
tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 0})
l := &logCounter{LogSink: tLog.GetSink()}
otel.SetLogger(logr.New(l))
global.SetLogger(logr.New(l))
prodion23 marked this conversation as resolved.
Show resolved Hide resolved

te := testBatchExporter{}
tp := basicTracerProvider(t)
option := testOption{
name: "default BatchSpanProcessorOptions",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(0),
sdktrace.WithMaxExportBatchSize(10),
},
genNumSpans: 100,
}
ssp := createAndRegisterBatchSPNonBlocking(option, &te)
prodion23 marked this conversation as resolved.
Show resolved Hide resolved
if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOption")

// Force flush any held span batches
generateSpan(t, tr, option)
ssp.ForceFlush(context.Background())
prodion23 marked this conversation as resolved.
Show resolved Hide resolved

assert.Equal(t, len(l.logs), 1, "expected log message")
droppedLogs := l.logs[0]
assert.Regexp(t, regexp.MustCompile(`dropped spans\[total_dropped \d+ known_dropped \d+\]`), droppedLogs)
}

func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) {
te := testBatchExporter{
errors: []error{errors.New("fail to export")},
Expand Down