Skip to content

Commit

Permalink
Fix panic, deadlock and race in BatchSpanProcessor (#4353)
Browse files Browse the repository at this point in the history
  • Loading branch information
pellared committed Jul 25, 2023
1 parent e26d8bd commit 088ac8e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337)
- Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338)
- Log a suggested view that fixes instrument conflicts in `go.opentelemetry.io/otel/sdk/metric`. (#4349)
- Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353)

## [1.16.0/0.39.0] 2023-05-18

Expand Down
63 changes: 23 additions & 40 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -84,6 +83,7 @@ type batchSpanProcessor struct {
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopped atomic.Bool
}

var _ SpanProcessor = (*batchSpanProcessor)(nil)
Expand Down Expand Up @@ -137,6 +137,11 @@ func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan)

// OnEnd method enqueues a ReadOnlySpan for later processing.
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
// Do not enqueue spans after Shutdown.
if bsp.stopped.Load() {
return
}

// Do not enqueue spans if we are just going to drop them.
if bsp.e == nil {
return
Expand All @@ -149,6 +154,7 @@ func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() {
bsp.stopped.Store(true)
wait := make(chan struct{})
go func() {
close(bsp.stopCh)
Expand Down Expand Up @@ -181,11 +187,19 @@ func (f forceFlushSpan) SpanContext() trace.SpanContext {

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
// Do nothing after Shutdown.
if bsp.stopped.Load() {
return nil
}

var err error
if bsp.e != nil {
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
select {
case <-bsp.stopCh:
// The batchSpanProcessor is Shutdown.
return nil
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
Expand Down Expand Up @@ -326,11 +340,9 @@ func (bsp *batchSpanProcessor) drainQueue() {
for {
select {
case sd := <-bsp.queue:
if sd == nil {
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
return
if _, ok := sd.(forceFlushSpan); ok {
// Ignore flush requests as they are not valid spans.
continue
}

bsp.batchMutex.Lock()
Expand All @@ -344,7 +356,11 @@ func (bsp *batchSpanProcessor) drainQueue() {
}
}
default:
close(bsp.queue)
// There are no more enqueued spans. Make final export.
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
return
}
}
}
Expand All @@ -358,34 +374,11 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
}
}

func recoverSendOnClosedChan() {
x := recover()
switch err := x.(type) {
case nil:
return
case runtime.Error:
if err.Error() == "send on closed channel" {
return
}
}
panic(x)
}

func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}

// This ensures the bsp.queue<- below does not panic as the
// processor shuts down.
defer recoverSendOnClosedChan()

select {
case <-bsp.stopCh:
return false
default:
}

select {
case bsp.queue <- sd:
return true
Expand All @@ -399,16 +392,6 @@ func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan)
return false
}

// This ensures the bsp.queue<- below does not panic as the
// processor shuts down.
defer recoverSendOnClosedChan()

select {
case <-bsp.stopCh:
return false
default:
}

select {
case bsp.queue <- sd:
return true
Expand Down
43 changes: 43 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,49 @@ func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
}
}

func TestBatchSpanProcessorConcurrentSafe(t *testing.T) {
ctx := context.Background()
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer(t.Name())

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
generateSpan(t, tr, testOption{genNumSpans: 1})
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = bsp.ForceFlush(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = bsp.Shutdown(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = tp.ForceFlush(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = tp.Shutdown(ctx)
}()

wg.Wait()
}

func BenchmarkSpanProcessor(b *testing.B) {
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(
Expand Down

0 comments on commit 088ac8e

Please sign in to comment.