diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f35c83d76d..5b8164147ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner` - The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`. - The No-op implementations of sync and async instruments are no longer exported, new functions `sdkapi.NewNoopAsyncInstrument()` and `sdkapi.NewNoopSyncInstrument()` are provided instead. (#2271) +- Update the SDK `BatchSpanProcessor` to export all queued spans when `ForceFlush` is called. (#2080, #2335) ### Added diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index db873c16f52..46f66049edd 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -22,6 +22,7 @@ import ( "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) // Defaults for BatchSpanProcessorOptions. @@ -153,10 +154,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { return err } +type forceFlushSpan struct { + ReadOnlySpan + flushed chan struct{} +} + +func (f forceFlushSpan) SpanContext() trace.SpanContext { + return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) +} + // ForceFlush exports all ended spans that have not yet been exported. func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { var err error if bsp.e != nil { + flushCh := make(chan struct{}) + if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) { + select { + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called + case <-ctx.Done(): + return ctx.Err() + } + } + wait := make(chan error) go func() { wait <- bsp.exportSpans(ctx) @@ -248,6 +268,10 @@ func (bsp *batchSpanProcessor) processQueue() { otel.Handle(err) } case sd := <-bsp.queue: + if ffs, ok := sd.(forceFlushSpan); ok { + close(ffs.flushed) + continue + } bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize @@ -296,8 +320,12 @@ func (bsp *batchSpanProcessor) drainQueue() { } func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { + bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull) +} + +func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) bool { if !sd.SpanContext().IsSampled() { - return + return false } // This ensures the bsp.queue<- below does not panic as the @@ -317,18 +345,24 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { select { case <-bsp.stopCh: - return + return false default: } - if bsp.o.BlockOnQueueFull { - bsp.queue <- sd - return + if block { + select { + case bsp.queue <- sd: + return true + case <-ctx.Done(): + return false + } } select { case bsp.queue <- sd: + return true default: atomic.AddUint32(&bsp.dropped, 1) } + return false } diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 84cdf2e49e8..d40ea27df77 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/binary" "errors" + "fmt" "sync" "testing" "time" @@ -25,6 +26,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -457,3 +460,25 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) { t.Errorf("expected %q error, got %v", want, got) } } + +func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) { + ctx := context.Background() + + exp := tracetest.NewInMemoryExporter() + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + ) + + tracer := tp.Tracer("tracer") + + for i := 0; i < 10; i++ { + _, span := tracer.Start(ctx, fmt.Sprintf("span%d", i)) + span.End() + + err := tp.ForceFlush(ctx) + assert.NoError(t, err) + + assert.Len(t, exp.GetSpans(), i+1) + } +}