Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empty queued spans when ForceFlush called #2335

Merged
merged 9 commits into from Nov 5, 2021
Merged
23 changes: 23 additions & 0 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -153,10 +153,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
return err
}

type forceFlushSpan struct {
ReadOnlySpan
flushed chan struct{}
}

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
select {
case bsp.queue <- forceFlushSpan{flushed: flushCh}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use the enque method here instead to handle the shutdown case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though this blocks on a full queue ... It might make sense to copy some of the logic from that method to here to handle the case where shutdown has already occurred or is concurrently happening.

case <-ctx.Done():
return ctx.Err()
}

select {
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}

wait := make(chan error)
go func() {
wait <- bsp.exportSpans(ctx)
Expand Down Expand Up @@ -248,6 +267,10 @@ func (bsp *batchSpanProcessor) processQueue() {
otel.Handle(err)
}
case sd := <-bsp.queue:
if ffs, ok := sd.(forceFlushSpan); ok {
close(ffs.flushed)
return
pkwarren marked this conversation as resolved.
Show resolved Hide resolved
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
Expand Down
22 changes: 22 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/trace/tracetest"

"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -457,3 +458,24 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
t.Errorf("expected %q error, got %v", want, got)
}
}

func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
ctx := context.Background()

exp := tracetest.NewInMemoryExporter()

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)

tracer := tp.Tracer("tracer")

_, span := tracer.Start(ctx, "span")
span.End()

err := tp.ForceFlush(ctx)
assert.NoError(t, err)

// Expect well defined behavior due to calling ForceFlush
assert.Len(t, exp.GetSpans(), 1)
pkwarren marked this conversation as resolved.
Show resolved Hide resolved
}