Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empty queued spans when ForceFlush called #2335

Merged
merged 9 commits into from Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner`
- The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`.
- The No-op implementations of sync and async instruments are no longer exported, new functions `sdkapi.NewNoopAsyncInstrument()` and `sdkapi.NewNoopSyncInstrument()` are provided instead. (#2271)
- Update the SDK `BatchSpanProcessor` to export all queued spans when `ForceFlush` is called. (#2080, #2335)

### Added

Expand Down
44 changes: 39 additions & 5 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

// Defaults for BatchSpanProcessorOptions.
Expand Down Expand Up @@ -153,10 +154,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
return err
}

type forceFlushSpan struct {
ReadOnlySpan
flushed chan struct{}
}

func (f forceFlushSpan) SpanContext() trace.SpanContext {
return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using enqueue requires that the span being queued is marked as sampled.

}

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) {
select {
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}
}

wait := make(chan error)
go func() {
wait <- bsp.exportSpans(ctx)
Expand Down Expand Up @@ -248,6 +268,10 @@ func (bsp *batchSpanProcessor) processQueue() {
otel.Handle(err)
}
case sd := <-bsp.queue:
if ffs, ok := sd.(forceFlushSpan); ok {
close(ffs.flushed)
continue
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
Expand Down Expand Up @@ -296,8 +320,12 @@ func (bsp *batchSpanProcessor) drainQueue() {
}

func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to work in the ForceFlush method, it was needed to pass a context (to allow cancellation).

}

func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) bool {
if !sd.SpanContext().IsSampled() {
return
return false
}

// This ensures the bsp.queue<- below does not panic as the
Expand All @@ -317,18 +345,24 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {

select {
case <-bsp.stopCh:
return
return false
default:
}

if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
return
if block {
select {
case bsp.queue <- sd:
return true
case <-ctx.Done():
return false
}
}

select {
case bsp.queue <- sd:
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
}
return false
}
25 changes: 25 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Expand Up @@ -18,13 +18,16 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/sdk/trace/tracetest"

"go.opentelemetry.io/otel/trace"

sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -457,3 +460,25 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
t.Errorf("expected %q error, got %v", want, got)
}
}

func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
ctx := context.Background()

exp := tracetest.NewInMemoryExporter()

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)

tracer := tp.Tracer("tracer")

for i := 0; i < 10; i++ {
_, span := tracer.Start(ctx, fmt.Sprintf("span%d", i))
span.End()

err := tp.ForceFlush(ctx)
assert.NoError(t, err)

assert.Len(t, exp.GetSpans(), i+1)
}
}