From 6d2aeb0dc3dd53af9b825ced8067f7fa2b3bca7a Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Fri, 5 Nov 2021 10:34:57 -0500 Subject: [PATCH] Empty queued spans when ForceFlush called (#2335) * Empty queued spans when ForceFlush called Update the implementation of ForceFlush() to first ensure that all spans which are queued are added to the batch before calling export spans. Create a small ReadOnlySpan implementation which can be used as a marker that ForceFlush has been invoked and used to notify when all spans are ready to be exported. Fixes #2080. * Add a changelog entry. * Update CHANGELOG.md Co-authored-by: Tyler Yahn * Update sdk/trace/batch_span_processor.go Co-authored-by: Tyler Yahn * Improve test case to verify multiple flushes. * Refactor code to use enqueue. * Be more defensive on waiting for queue. Update the handling of the force flush span so we only wait on the channel if we were able to enqueue the span to the queue. * Fix linter. Co-authored-by: Tyler Yahn Co-authored-by: Anthony Mirabella --- CHANGELOG.md | 1 + sdk/trace/batch_span_processor.go | 44 +++++++++++++++++++++++--- sdk/trace/batch_span_processor_test.go | 25 +++++++++++++++ 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f35c83d76d..5b8164147ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner` - The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`. - The No-op implementations of sync and async instruments are no longer exported, new functions `sdkapi.NewNoopAsyncInstrument()` and `sdkapi.NewNoopSyncInstrument()` are provided instead. (#2271) +- Update the SDK `BatchSpanProcessor` to export all queued spans when `ForceFlush` is called. (#2080, #2335) ### Added diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index db873c16f52..46f66049edd 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -22,6 +22,7 @@ import ( "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) // Defaults for BatchSpanProcessorOptions. @@ -153,10 +154,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { return err } +type forceFlushSpan struct { + ReadOnlySpan + flushed chan struct{} +} + +func (f forceFlushSpan) SpanContext() trace.SpanContext { + return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) +} + // ForceFlush exports all ended spans that have not yet been exported. func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { var err error if bsp.e != nil { + flushCh := make(chan struct{}) + if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) { + select { + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called + case <-ctx.Done(): + return ctx.Err() + } + } + wait := make(chan error) go func() { wait <- bsp.exportSpans(ctx) @@ -248,6 +268,10 @@ func (bsp *batchSpanProcessor) processQueue() { otel.Handle(err) } case sd := <-bsp.queue: + if ffs, ok := sd.(forceFlushSpan); ok { + close(ffs.flushed) + continue + } bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize @@ -296,8 +320,12 @@ func (bsp *batchSpanProcessor) drainQueue() { } func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { + bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull) +} + +func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) bool { if !sd.SpanContext().IsSampled() { - return + return false } // This ensures the bsp.queue<- below does not panic as the @@ -317,18 +345,24 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { select { case <-bsp.stopCh: - return + return false default: } - if bsp.o.BlockOnQueueFull { - bsp.queue <- sd - return + if block { + select { + case bsp.queue <- sd: + return true + case <-ctx.Done(): + return false + } } select { case bsp.queue <- sd: + return true default: atomic.AddUint32(&bsp.dropped, 1) } + return false } diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 84cdf2e49e8..d40ea27df77 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/binary" "errors" + "fmt" "sync" "testing" "time" @@ -25,6 +26,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -457,3 +460,25 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) { t.Errorf("expected %q error, got %v", want, got) } } + +func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) { + ctx := context.Background() + + exp := tracetest.NewInMemoryExporter() + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + ) + + tracer := tp.Tracer("tracer") + + for i := 0; i < 10; i++ { + _, span := tracer.Start(ctx, fmt.Sprintf("span%d", i)) + span.End() + + err := tp.ForceFlush(ctx) + assert.NoError(t, err) + + assert.Len(t, exp.GetSpans(), i+1) + } +}