Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empty queued spans when ForceFlush called #2335

Merged
merged 9 commits into from Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner`
- The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`.
- The No-op implementations of sync and async instruments are no longer exported, new functions `sdkapi.NewNoopAsyncInstrument()` and `sdkapi.NewNoopSyncInstrument()` are provided instead. (#2271)
- Update the SDK `BatchSpanProcessor` to export all queued spans when `ForceFlush` is called. (#2080, #2335)

### Added

Expand Down
38 changes: 35 additions & 3 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

// Defaults for BatchSpanProcessorOptions.
Expand Down Expand Up @@ -153,10 +154,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
return err
}

type forceFlushSpan struct {
ReadOnlySpan
flushed chan struct{}
}

func (f forceFlushSpan) SpanContext() trace.SpanContext {
return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using enqueue requires that the span being queued is marked as sampled.

}

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass true as the last argument here since we want to block for this span to be queued.


select {
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}

wait := make(chan error)
go func() {
wait <- bsp.exportSpans(ctx)
Expand Down Expand Up @@ -248,6 +268,10 @@ func (bsp *batchSpanProcessor) processQueue() {
otel.Handle(err)
}
case sd := <-bsp.queue:
if ffs, ok := sd.(forceFlushSpan); ok {
close(ffs.flushed)
continue
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
Expand Down Expand Up @@ -296,6 +320,10 @@ func (bsp *batchSpanProcessor) drainQueue() {
}

func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to work in the ForceFlush method, it was needed to pass a context (to allow cancellation).

}

func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) {
if !sd.SpanContext().IsSampled() {
return
}
Expand All @@ -321,9 +349,13 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
default:
}

if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
return
if block {
select {
case bsp.queue <- sd:
return
case <-ctx.Done():
return
}
}

select {
Expand Down
24 changes: 24 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Expand Up @@ -18,12 +18,14 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/trace/tracetest"

"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -457,3 +459,25 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
t.Errorf("expected %q error, got %v", want, got)
}
}

func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
ctx := context.Background()

exp := tracetest.NewInMemoryExporter()

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)

tracer := tp.Tracer("tracer")

for i := 0; i < 10; i++ {
_, span := tracer.Start(ctx, fmt.Sprintf("span%d", i))
span.End()

err := tp.ForceFlush(ctx)
assert.NoError(t, err)

assert.Len(t, exp.GetSpans(), i+1)
}
}