From b2dd3c98984e2725c042ebbf8fdc231e7cf4d8eb Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Thu, 28 Oct 2021 14:42:13 -0500 Subject: [PATCH 1/8] Empty queued spans when ForceFlush called Update the implementation of ForceFlush() to first ensure that all spans which are queued are added to the batch before calling export spans. Create a small ReadOnlySpan implementation which can be used as a marker that ForceFlush has been invoked and used to notify when all spans are ready to be exported. Fixes #2080. --- sdk/trace/batch_span_processor.go | 23 +++++++++++++++++++++++ sdk/trace/batch_span_processor_test.go | 22 ++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index db873c16f52..aa146bcfd00 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -153,10 +153,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { return err } +type forceFlushSpan struct { + ReadOnlySpan + flushed chan struct{} +} + // ForceFlush exports all ended spans that have not yet been exported. func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { var err error if bsp.e != nil { + flushCh := make(chan struct{}) + select { + case bsp.queue <- forceFlushSpan{flushed: flushCh}: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called + case <-ctx.Done(): + return ctx.Err() + } + wait := make(chan error) go func() { wait <- bsp.exportSpans(ctx) @@ -248,6 +267,10 @@ func (bsp *batchSpanProcessor) processQueue() { otel.Handle(err) } case sd := <-bsp.queue: + if ffs, ok := sd.(forceFlushSpan); ok { + close(ffs.flushed) + return + } bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 84cdf2e49e8..573c14c3677 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.opentelemetry.io/otel/trace" @@ -457,3 +458,24 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) { t.Errorf("expected %q error, got %v", want, got) } } + +func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) { + ctx := context.Background() + + exp := tracetest.NewInMemoryExporter() + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + ) + + tracer := tp.Tracer("tracer") + + _, span := tracer.Start(ctx, "span") + span.End() + + err := tp.ForceFlush(ctx) + assert.NoError(t, err) + + // Expect well defined behavior due to calling ForceFlush + assert.Len(t, exp.GetSpans(), 1) +} From 0e621b8983d7e94cb0e25d49a360294147d4e2c9 Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Thu, 28 Oct 2021 20:17:55 -0500 Subject: [PATCH 2/8] Add a changelog entry. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a2e940c978..10633144422 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner` - The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`. - The No-op implementations of sync and async instruments are no longer exported, new functions `sdkapi.NewNoopAsyncInstrument()` and `sdkapi.NewNoopSyncInstrument()` are provided instead. (#2271) +- Update the batch span processor to export all queued spans when `ForceFlush` is called. (#2080) ### Added From c7e4c5167d99f02acf32d6bf9bb9da7d3aaac1e0 Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Fri, 29 Oct 2021 11:59:07 -0500 Subject: [PATCH 3/8] Update CHANGELOG.md Co-authored-by: Tyler Yahn --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10633144422..e9133d27beb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner` - The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`. - The No-op implementations of sync and async instruments are no longer exported, new functions `sdkapi.NewNoopAsyncInstrument()` and `sdkapi.NewNoopSyncInstrument()` are provided instead. (#2271) -- Update the batch span processor to export all queued spans when `ForceFlush` is called. (#2080) +- Update the SDK `BatchSpanProcessor` to export all queued spans when `ForceFlush` is called. (#2080, #2335) ### Added From 4eb4f2ba66bb2e9557912e65eba8a2c4920d5a3b Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Fri, 29 Oct 2021 11:59:26 -0500 Subject: [PATCH 4/8] Update sdk/trace/batch_span_processor.go Co-authored-by: Tyler Yahn --- sdk/trace/batch_span_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index aa146bcfd00..0499db15c8f 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -269,7 +269,7 @@ func (bsp *batchSpanProcessor) processQueue() { case sd := <-bsp.queue: if ffs, ok := sd.(forceFlushSpan); ok { close(ffs.flushed) - return + continue } bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) From 0180f37986d8a353edfe4fd60205f3722f7c3abc Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Fri, 29 Oct 2021 12:12:13 -0500 Subject: [PATCH 5/8] Improve test case to verify multiple flushes. --- sdk/trace/batch_span_processor_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 573c14c3677..7ff36efb33a 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/binary" "errors" + "fmt" "sync" "testing" "time" @@ -470,12 +471,13 @@ func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) { tracer := tp.Tracer("tracer") - _, span := tracer.Start(ctx, "span") - span.End() + for i := 0; i < 10; i++ { + _, span := tracer.Start(ctx, fmt.Sprintf("span%d", i)) + span.End() - err := tp.ForceFlush(ctx) - assert.NoError(t, err) + err := tp.ForceFlush(ctx) + assert.NoError(t, err) - // Expect well defined behavior due to calling ForceFlush - assert.Len(t, exp.GetSpans(), 1) + assert.Len(t, exp.GetSpans(), i+1) + } } From cb2308129d66570f2c7372f780c5e4aebab82ece Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Fri, 29 Oct 2021 13:03:00 -0500 Subject: [PATCH 6/8] Refactor code to use enqueue. --- sdk/trace/batch_span_processor.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 0499db15c8f..841942daa00 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -22,6 +22,7 @@ import ( "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) // Defaults for BatchSpanProcessorOptions. @@ -158,16 +159,16 @@ type forceFlushSpan struct { flushed chan struct{} } +func (f forceFlushSpan) SpanContext() trace.SpanContext { + return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) +} + // ForceFlush exports all ended spans that have not yet been exported. func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { var err error if bsp.e != nil { flushCh := make(chan struct{}) - select { - case bsp.queue <- forceFlushSpan{flushed: flushCh}: - case <-ctx.Done(): - return ctx.Err() - } + bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) select { case <-flushCh: @@ -319,6 +320,10 @@ func (bsp *batchSpanProcessor) drainQueue() { } func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { + bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull) +} + +func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) { if !sd.SpanContext().IsSampled() { return } @@ -344,9 +349,13 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { default: } - if bsp.o.BlockOnQueueFull { - bsp.queue <- sd - return + if block { + select { + case bsp.queue <- sd: + return + case <-ctx.Done(): + return + } } select { From bf24f4ca3789d1d9a676bda6208bfef40d6fa4ac Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Fri, 29 Oct 2021 20:19:03 -0500 Subject: [PATCH 7/8] Be more defensive on waiting for queue. Update the handling of the force flush span so we only wait on the channel if we were able to enqueue the span to the queue. --- sdk/trace/batch_span_processor.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 841942daa00..46f66049edd 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -168,13 +168,13 @@ func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { var err error if bsp.e != nil { flushCh := make(chan struct{}) - bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) - - select { - case <-flushCh: - // Processed any items in queue prior to ForceFlush being called - case <-ctx.Done(): - return ctx.Err() + if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) { + select { + case <-flushCh: + // Processed any items in queue prior to ForceFlush being called + case <-ctx.Done(): + return ctx.Err() + } } wait := make(chan error) @@ -323,9 +323,9 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull) } -func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) { +func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) bool { if !sd.SpanContext().IsSampled() { - return + return false } // This ensures the bsp.queue<- below does not panic as the @@ -345,22 +345,24 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R select { case <-bsp.stopCh: - return + return false default: } if block { select { case bsp.queue <- sd: - return + return true case <-ctx.Done(): - return + return false } } select { case bsp.queue <- sd: + return true default: atomic.AddUint32(&bsp.dropped, 1) } + return false } From c03a61a95f412718eeae0053c419e9c5c389050c Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Wed, 3 Nov 2021 09:02:08 -0500 Subject: [PATCH 8/8] Fix linter. --- sdk/trace/batch_span_processor_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 7ff36efb33a..d40ea27df77 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.opentelemetry.io/otel/trace"