Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic, deadlock and race in BatchSpanProcessor #4353

Merged
merged 9 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317)
- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337)
- Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338)
- Fix possible deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353)

## [1.16.0/0.39.0] 2023-05-18

Expand Down
62 changes: 20 additions & 42 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -84,6 +83,7 @@
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopped atomic.Bool
pellared marked this conversation as resolved.
Show resolved Hide resolved
}

var _ SpanProcessor = (*batchSpanProcessor)(nil)
Expand Down Expand Up @@ -137,6 +137,11 @@

// OnEnd method enqueues a ReadOnlySpan for later processing.
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
// Do not enqueue spans ater Shutdown.
pellared marked this conversation as resolved.
Show resolved Hide resolved
if bsp.stopped.Load() {
return
}

// Do not enqueue spans if we are just going to drop them.
if bsp.e == nil {
return
Expand All @@ -149,6 +154,7 @@
func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() {
bsp.stopped.Store(true)
wait := make(chan struct{})
go func() {
close(bsp.stopCh)
Expand Down Expand Up @@ -181,11 +187,19 @@

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
// Do nothing after Shutdown.
if bsp.stopped.Load() {
return nil
}

var err error
if bsp.e != nil {
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
select {
case <-bsp.stopCh:
// The batchSpanProcessor is Shutdown.
return nil

Check warning on line 202 in sdk/trace/batch_span_processor.go

View check run for this annotation

Codecov / codecov/patch

sdk/trace/batch_span_processor.go#L200-L202

Added lines #L200 - L202 were not covered by tests
pellared marked this conversation as resolved.
Show resolved Hide resolved
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
Expand Down Expand Up @@ -326,13 +340,6 @@
for {
select {
case sd := <-bsp.queue:
if sd == nil {
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
return
}

bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
Expand All @@ -344,7 +351,11 @@
}
}
default:
close(bsp.queue)
// There are no more enqueued spans. Make final export.
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
return
}
}
}
Expand All @@ -358,34 +369,11 @@
}
}

func recoverSendOnClosedChan() {
x := recover()
switch err := x.(type) {
case nil:
return
case runtime.Error:
if err.Error() == "send on closed channel" {
return
}
}
panic(x)
}

func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}

// This ensures the bsp.queue<- below does not panic as the
// processor shuts down.
defer recoverSendOnClosedChan()

select {
case <-bsp.stopCh:
return false
default:
}

select {
case bsp.queue <- sd:
return true
Expand All @@ -399,16 +387,6 @@
return false
}

// This ensures the bsp.queue<- below does not panic as the
// processor shuts down.
defer recoverSendOnClosedChan()

select {
case <-bsp.stopCh:
return false
default:
}

select {
case bsp.queue <- sd:
return true
Expand Down
43 changes: 43 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,49 @@ func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
}
}

func TestBatchSpanProcessorConcurrentSafe(t *testing.T) {
ctx := context.Background()
var bp testBatchExporter
bsp := sdktrace.NewBatchSpanProcessor(&bp)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer(t.Name())

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
generateSpan(t, tr, testOption{genNumSpans: 1})
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = bsp.ForceFlush(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = bsp.Shutdown(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = tp.ForceFlush(ctx)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = tp.Shutdown(ctx)
}()

wg.Wait()
}

func BenchmarkSpanProcessor(b *testing.B) {
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(
Expand Down