From c97924f7d4ab74f3d908ea91741f88099788b1a2 Mon Sep 17 00:00:00 2001 From: prodion23 Date: Wed, 19 Oct 2022 11:06:56 -0400 Subject: [PATCH 01/10] Add warn logging when bsp drops spans --- CHANGELOG.md | 1 + internal/global/internal_logging.go | 7 ++++ sdk/trace/batch_span_processor.go | 14 +++++-- sdk/trace/batch_span_processor_test.go | 52 ++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 87d13296190..35ed58481d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Prometheus exporter will register with a prometheus registerer on creation, there are options to control this. (#3239) +- Batch span processor will now keep track of known dropped span count and log a warning during batch export on newly dropped spans. (#3289) ### Changed diff --git a/internal/global/internal_logging.go b/internal/global/internal_logging.go index ccb3258711a..d0953e206ee 100644 --- a/internal/global/internal_logging.go +++ b/internal/global/internal_logging.go @@ -48,6 +48,13 @@ func Info(msg string, keysAndValues ...interface{}) { globalLogger.V(1).Info(msg, keysAndValues...) } +// Warn prints messages about potential issues with the API or SDK. +func Warn(msg string, keysAndValues ...interface{}) { + globalLoggerLock.RLock() + defer globalLoggerLock.RUnlock() + globalLogger.V(0).Info(msg, keysAndValues...) +} + // Error prints messages about exceptional states of the API or SDK. func Error(err error, msg string, keysAndValues ...interface{}) { globalLoggerLock.RLock() diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index a2d7db49001..c6dbed16050 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -75,8 +75,9 @@ type batchSpanProcessor struct { e SpanExporter o BatchSpanProcessorOptions - queue chan ReadOnlySpan - dropped uint32 + queue chan ReadOnlySpan + dropped uint32 + knownDropped uint32 batch []ReadOnlySpan batchMutex sync.Mutex @@ -265,7 +266,14 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { } if l := len(bsp.batch); l > 0 { - global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped)) + global.Debug("exporting spans", "count", len(bsp.batch)) + + dropped := atomic.LoadUint32(&bsp.dropped) + knownDropped := atomic.SwapUint32(&bsp.knownDropped, dropped) + if dropped > knownDropped { + global.Warn("dropped spans", "total_dropped", dropped, "known_dropped", knownDropped) + } + err := bsp.e.ExportSpans(ctx, bsp.batch) // A new batch is always created after exporting, even if the batch failed to be exported. diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index a033b6a0082..af135936c5e 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -19,7 +19,11 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/go-logr/logr" + "github.com/go-logr/logr/testr" + "go.opentelemetry.io/otel" "os" + "regexp" "sync" "testing" "time" @@ -49,6 +53,16 @@ type testBatchExporter struct { err error } +type logCounter struct { + logr.LogSink + logs []string +} + +func (l *logCounter) Info(level int, msg string, keysAndValues ...interface{}) { + l.logs = append(l.logs, fmt.Sprint(msg, keysAndValues)) + l.LogSink.Info(level, msg, keysAndValues...) +} + func (t *testBatchExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { t.mu.Lock() defer t.mu.Unlock() @@ -350,6 +364,12 @@ func createAndRegisterBatchSP(option testOption, te *testBatchExporter) sdktrace return sdktrace.NewBatchSpanProcessor(te, options...) } +func createAndRegisterBatchSPNonBlocking(option testOption, te *testBatchExporter) sdktrace.SpanProcessor { + // Blocking span processor should be preferred to avoid flaky test + // To test dropped spans warn logging the blocking span processor bypasses drop logic + return sdktrace.NewBatchSpanProcessor(te, option.o...) +} + func generateSpan(t *testing.T, tr trace.Tracer, option testOption) { sc := getSpanContext() @@ -474,6 +494,38 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) { assert.NoError(t, err) } +func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { + tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 0}) + l := &logCounter{LogSink: tLog.GetSink()} + otel.SetLogger(logr.New(l)) + global.SetLogger(logr.New(l)) + + te := testBatchExporter{} + tp := basicTracerProvider(t) + option := testOption{ + name: "default BatchSpanProcessorOptions", + o: []sdktrace.BatchSpanProcessorOption{ + sdktrace.WithMaxQueueSize(0), + sdktrace.WithMaxExportBatchSize(10), + }, + genNumSpans: 100, + } + ssp := createAndRegisterBatchSPNonBlocking(option, &te) + if ssp == nil { + t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) + } + tp.RegisterSpanProcessor(ssp) + tr := tp.Tracer("BatchSpanProcessorWithOption") + + // Force flush any held span batches + generateSpan(t, tr, option) + ssp.ForceFlush(context.Background()) + + assert.Equal(t, len(l.logs), 1, "expected log message") + droppedLogs := l.logs[0] + assert.Regexp(t, regexp.MustCompile(`dropped spans\[total_dropped \d+ known_dropped \d+\]`), droppedLogs) +} + func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) { te := testBatchExporter{ errors: []error{errors.New("fail to export")}, From c3d105a25ab8ebd22a804fd257ae997187edacd1 Mon Sep 17 00:00:00 2001 From: prodion23 Date: Fri, 21 Oct 2022 11:29:09 -0400 Subject: [PATCH 02/10] move change to unreleased --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a024fc89fbf..e89f403c0f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +- Batch span processor will now keep track of known dropped span count and log a warning during batch export on newly dropped spans. (#3289) + ### Fixed - The `go.opentelemetry.io/otel/exporters/prometheus` exporter fixes duplicated `_total` suffixes. (#3369) @@ -21,7 +23,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm A non-default registerer can be used by passing the `WithRegisterer` option. (#3239) - Added the `WithAggregationSelector` option to the `go.opentelemetry.io/otel/exporters/prometheus` package to change the default `AggregationSelector` used. (#3341) - The Prometheus exporter in `go.opentelemetry.io/otel/exporters/prometheus` converts the `Resource` associated with metric exports into a `target_info` metric. (#3285) -- Batch span processor will now keep track of known dropped span count and log a warning during batch export on newly dropped spans. (#3289) ### Changed From a754299efccf58e4a238dbf1e2d95c5733d07916 Mon Sep 17 00:00:00 2001 From: prodion23 Date: Tue, 25 Oct 2022 15:10:20 -0400 Subject: [PATCH 03/10] Update changelog, fix lint, remove warn log switch to info --- CHANGELOG.md | 4 +++- internal/global/internal_logging.go | 7 ------- sdk/trace/batch_span_processor.go | 3 ++- sdk/trace/batch_span_processor_test.go | 9 ++++----- 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e89f403c0f5..e22c1b57eff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] -- Batch span processor will now keep track of known dropped span count and log a warning during batch export on newly dropped spans. (#3289) +### Added + +- The `go.opentelemetry.io/sdk/trace/batch_span_processor` will now keep track of known dropped span count and log during batch export on newly dropped spans. (#3289) ### Fixed diff --git a/internal/global/internal_logging.go b/internal/global/internal_logging.go index d0953e206ee..ccb3258711a 100644 --- a/internal/global/internal_logging.go +++ b/internal/global/internal_logging.go @@ -48,13 +48,6 @@ func Info(msg string, keysAndValues ...interface{}) { globalLogger.V(1).Info(msg, keysAndValues...) } -// Warn prints messages about potential issues with the API or SDK. -func Warn(msg string, keysAndValues ...interface{}) { - globalLoggerLock.RLock() - defer globalLoggerLock.RUnlock() - globalLogger.V(0).Info(msg, keysAndValues...) -} - // Error prints messages about exceptional states of the API or SDK. func Error(err error, msg string, keysAndValues ...interface{}) { globalLoggerLock.RLock() diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index c6dbed16050..66a4178f7a5 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -271,7 +271,8 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { dropped := atomic.LoadUint32(&bsp.dropped) knownDropped := atomic.SwapUint32(&bsp.knownDropped, dropped) if dropped > knownDropped { - global.Warn("dropped spans", "total_dropped", dropped, "known_dropped", knownDropped) + droppedThisBatch := dropped - knownDropped + global.Info("dropped spans", "total", dropped, "this_batch", droppedThisBatch) } err := bsp.e.ExportSpans(ctx, bsp.batch) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index af135936c5e..aa4114fb091 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -19,22 +19,21 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/go-logr/logr" - "github.com/go-logr/logr/testr" - "go.opentelemetry.io/otel" "os" "regexp" "sync" "testing" "time" - ottest "go.opentelemetry.io/otel/internal/internaltest" - + "github.com/go-logr/logr" "github.com/go-logr/logr/funcr" + "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" + ottest "go.opentelemetry.io/otel/internal/internaltest" "go.opentelemetry.io/otel/sdk/internal/env" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" From 863028925483cd9346b1abe6bcaa7194fff8f72f Mon Sep 17 00:00:00 2001 From: prodion23 Date: Tue, 25 Oct 2022 15:13:22 -0400 Subject: [PATCH 04/10] Update test --- sdk/trace/batch_span_processor_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index aa4114fb091..75ba6e60bbc 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -494,7 +494,7 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) { } func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { - tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 0}) + tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 1}) l := &logCounter{LogSink: tLog.GetSink()} otel.SetLogger(logr.New(l)) global.SetLogger(logr.New(l)) @@ -520,9 +520,9 @@ func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { generateSpan(t, tr, option) ssp.ForceFlush(context.Background()) - assert.Equal(t, len(l.logs), 1, "expected log message") - droppedLogs := l.logs[0] - assert.Regexp(t, regexp.MustCompile(`dropped spans\[total_dropped \d+ known_dropped \d+\]`), droppedLogs) + assert.Equal(t, len(l.logs), 3, "expected log message") + droppedLogs := l.logs[2] + assert.Regexp(t, regexp.MustCompile(`dropped spans\[total \d+ this_batch \d+\]`), droppedLogs) } func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) { From 5903e9cec9cafe62720290f98e57a3489aa72782 Mon Sep 17 00:00:00 2001 From: prodion23 Date: Wed, 2 Nov 2022 10:52:30 -0400 Subject: [PATCH 05/10] Pr feedback, update test with logger reset for race condition --- CHANGELOG.md | 2 +- sdk/trace/batch_span_processor.go | 12 ++++++------ sdk/trace/batch_span_processor_test.go | 18 ++++++++++++++---- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e22c1b57eff..b0ba3a1dad4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added -- The `go.opentelemetry.io/sdk/trace/batch_span_processor` will now keep track of known dropped span count and log during batch export on newly dropped spans. (#3289) +- The `BatchSpanProcessor` from `go.opentelemetry.io/otel/sdk/trace` logs the spans dropped in a batch every export with verbosity `1`. (#3289) ### Fixed diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 66a4178f7a5..127dad79dd0 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -75,9 +75,9 @@ type batchSpanProcessor struct { e SpanExporter o BatchSpanProcessorOptions - queue chan ReadOnlySpan - dropped uint32 - knownDropped uint32 + queue chan ReadOnlySpan + dropped uint32 + reportedDropped uint32 batch []ReadOnlySpan batchMutex sync.Mutex @@ -269,9 +269,9 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { global.Debug("exporting spans", "count", len(bsp.batch)) dropped := atomic.LoadUint32(&bsp.dropped) - knownDropped := atomic.SwapUint32(&bsp.knownDropped, dropped) - if dropped > knownDropped { - droppedThisBatch := dropped - knownDropped + reportedDropped := atomic.SwapUint32(&bsp.reportedDropped, dropped) + if dropped > reportedDropped { + droppedThisBatch := dropped - reportedDropped global.Info("dropped spans", "total", dropped, "this_batch", droppedThisBatch) } diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 75ba6e60bbc..68745c7ea47 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -19,6 +19,8 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/go-logr/stdr" + "log" "os" "regexp" "sync" @@ -497,7 +499,6 @@ func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 1}) l := &logCounter{LogSink: tLog.GetSink()} otel.SetLogger(logr.New(l)) - global.SetLogger(logr.New(l)) te := testBatchExporter{} tp := basicTracerProvider(t) @@ -519,10 +520,19 @@ func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { // Force flush any held span batches generateSpan(t, tr, option) ssp.ForceFlush(context.Background()) + reportedDropCount := 0 + logMatch := regexp.MustCompile(`dropped spans\[total \d+ this_batch \d+\]`) + for _, v := range l.logs { + match := logMatch.MatchString(v) + if match { + reportedDropCount++ + } + } + assert.GreaterOrEqual(t, reportedDropCount, 1) - assert.Equal(t, len(l.logs), 3, "expected log message") - droppedLogs := l.logs[2] - assert.Regexp(t, regexp.MustCompile(`dropped spans\[total \d+ this_batch \d+\]`), droppedLogs) + // have to reset the logger or other tests will fail during make test-race + logger := stdr.New(log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile)) + otel.SetLogger(logger) } func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) { From 5fba4069484ad6b0b6d27c25d811734d96c6d832 Mon Sep 17 00:00:00 2001 From: prodion23 Date: Wed, 2 Nov 2022 13:43:00 -0400 Subject: [PATCH 06/10] pr feedback, lint --- sdk/trace/batch_span_processor_test.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 68745c7ea47..eb9c5b6679b 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -19,7 +19,6 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/go-logr/stdr" "log" "os" "regexp" @@ -30,6 +29,7 @@ import ( "github.com/go-logr/logr" "github.com/go-logr/logr/funcr" "github.com/go-logr/logr/testr" + "github.com/go-logr/stdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -365,12 +365,6 @@ func createAndRegisterBatchSP(option testOption, te *testBatchExporter) sdktrace return sdktrace.NewBatchSpanProcessor(te, options...) } -func createAndRegisterBatchSPNonBlocking(option testOption, te *testBatchExporter) sdktrace.SpanProcessor { - // Blocking span processor should be preferred to avoid flaky test - // To test dropped spans warn logging the blocking span processor bypasses drop logic - return sdktrace.NewBatchSpanProcessor(te, option.o...) -} - func generateSpan(t *testing.T, tr trace.Tracer, option testOption) { sc := getSpanContext() @@ -510,15 +504,15 @@ func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { }, genNumSpans: 100, } - ssp := createAndRegisterBatchSPNonBlocking(option, &te) + ssp := sdktrace.NewBatchSpanProcessor(&te, option.o...) if ssp == nil { t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) } tp.RegisterSpanProcessor(ssp) tr := tp.Tracer("BatchSpanProcessorWithOption") - // Force flush any held span batches generateSpan(t, tr, option) + // Force flush any held span batches ssp.ForceFlush(context.Background()) reportedDropCount := 0 logMatch := regexp.MustCompile(`dropped spans\[total \d+ this_batch \d+\]`) @@ -530,7 +524,7 @@ func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { } assert.GreaterOrEqual(t, reportedDropCount, 1) - // have to reset the logger or other tests will fail during make test-race + // Reset the global logger so other tests are not impacted logger := stdr.New(log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile)) otel.SetLogger(logger) } From f344e9594d351d0f1af995f70ce04c4671597fd7 Mon Sep 17 00:00:00 2001 From: prodion23 Date: Wed, 2 Nov 2022 14:19:13 -0400 Subject: [PATCH 07/10] go mod logr --- sdk/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/go.mod b/sdk/go.mod index 022ad8ce1f2..a902115d992 100644 --- a/sdk/go.mod +++ b/sdk/go.mod @@ -6,6 +6,7 @@ replace go.opentelemetry.io/otel => ../ require ( github.com/go-logr/logr v1.2.3 + github.com/go-logr/stdr v1.2.2 github.com/google/go-cmp v0.5.9 github.com/stretchr/testify v1.8.1 go.opentelemetry.io/otel v1.11.1 @@ -15,7 +16,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) From e906ef807e27a895a997d39712a6021cdec51a51 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Wed, 30 Nov 2022 14:02:50 -0600 Subject: [PATCH 08/10] Updated test to make it more robust. Signed-off-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> --- sdk/trace/batch_span_processor_test.go | 55 ++++++++++++++++++++------ 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index eb9c5b6679b..113f06d21cf 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -489,40 +489,69 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) { assert.NoError(t, err) } +// This Exporter will never return +type blockingExporter struct { + block chan struct{} + close sync.Once +} + +func (e *blockingExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + <-e.block + return nil +} +func (e *blockingExporter) Shutdown(ctx context.Context) error { + e.close.Do(func() { + close(e.block) + }) + return nil +} +func (e *blockingExporter) unblock() { + select { + case e.block <- struct{}{}: + default: + } +} + func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 1}) l := &logCounter{LogSink: tLog.GetSink()} otel.SetLogger(logr.New(l)) - te := testBatchExporter{} + te := &blockingExporter{block: make(chan struct{})} tp := basicTracerProvider(t) option := testOption{ name: "default BatchSpanProcessorOptions", o: []sdktrace.BatchSpanProcessorOption{ sdktrace.WithMaxQueueSize(0), - sdktrace.WithMaxExportBatchSize(10), + sdktrace.WithMaxExportBatchSize(1), }, genNumSpans: 100, } - ssp := sdktrace.NewBatchSpanProcessor(&te, option.o...) + ssp := sdktrace.NewBatchSpanProcessor(te, option.o...) if ssp == nil { t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) } tp.RegisterSpanProcessor(ssp) tr := tp.Tracer("BatchSpanProcessorWithOption") - generateSpan(t, tr, option) - // Force flush any held span batches - ssp.ForceFlush(context.Background()) - reportedDropCount := 0 logMatch := regexp.MustCompile(`dropped spans\[total \d+ this_batch \d+\]`) - for _, v := range l.logs { - match := logMatch.MatchString(v) - if match { - reportedDropCount++ + assert.Eventually(t, func() bool { + generateSpan(t, tr, option) + te.unblock() + reportedDropCount := 0 + + for _, v := range l.logs { + match := logMatch.MatchString(v) + if match { + reportedDropCount++ + } } - } - assert.GreaterOrEqual(t, reportedDropCount, 1) + + return reportedDropCount >= 1 + }, time.Second, time.Millisecond) + te.Shutdown(context.Background()) + + ssp.Shutdown(context.Background()) // Reset the global logger so other tests are not impacted logger := stdr.New(log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile)) From 948dd869e09c4de425b9c050c256c780dab27a7b Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Wed, 30 Nov 2022 14:07:01 -0600 Subject: [PATCH 09/10] Fixes for lint Signed-off-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> --- sdk/trace/batch_span_processor_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 113f06d21cf..06c545ed7ef 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -489,7 +489,6 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) { assert.NoError(t, err) } -// This Exporter will never return type blockingExporter struct { block chan struct{} close sync.Once @@ -549,9 +548,9 @@ func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { return reportedDropCount >= 1 }, time.Second, time.Millisecond) - te.Shutdown(context.Background()) + _ = te.Shutdown(context.Background()) - ssp.Shutdown(context.Background()) + _ = ssp.Shutdown(context.Background()) // Reset the global logger so other tests are not impacted logger := stdr.New(log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile)) From 9af553aade07a09b85f983c7d56c637230bcfb51 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Fri, 2 Dec 2022 10:21:40 -0600 Subject: [PATCH 10/10] Made the testing logger thread safe. Moved the functionality of the testing stuckExporter into the blockingExporter. --- sdk/trace/batch_span_processor_test.go | 86 ++++++++++++++------------ 1 file changed, 45 insertions(+), 41 deletions(-) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 06c545ed7ef..480d47b3083 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -23,12 +23,12 @@ import ( "os" "regexp" "sync" + "sync/atomic" "testing" "time" "github.com/go-logr/logr" "github.com/go-logr/logr/funcr" - "github.com/go-logr/logr/testr" "github.com/go-logr/stdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -54,16 +54,6 @@ type testBatchExporter struct { err error } -type logCounter struct { - logr.LogSink - logs []string -} - -func (l *logCounter) Info(level int, msg string, keysAndValues ...interface{}) { - l.logs = append(l.logs, fmt.Sprint(msg, keysAndValues)) - l.LogSink.Info(level, msg, keysAndValues...) -} - func (t *testBatchExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { t.mu.Lock() defer t.mu.Unlock() @@ -328,19 +318,8 @@ func TestNewBatchSpanProcessorWithEnvOptions(t *testing.T) { } } -type stuckExporter struct { - testBatchExporter -} - -// ExportSpans waits for ctx to expire and returns that error. -func (e *stuckExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error { - <-ctx.Done() - e.err = ctx.Err() - return ctx.Err() -} - func TestBatchSpanProcessorExportTimeout(t *testing.T) { - exp := new(stuckExporter) + exp := newBlockingExporter() bsp := sdktrace.NewBatchSpanProcessor( exp, // Set a non-zero export timeout so a deadline is set. @@ -489,16 +468,30 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) { assert.NoError(t, err) } +// blockingExporter will block on ExportSpans until one of the following happens: +// a) The context is Canceled. +// b) unblock() is called. +// c) The exporter is ShutDown(). type blockingExporter struct { + testBatchExporter block chan struct{} close sync.Once } -func (e *blockingExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { - <-e.block +func newBlockingExporter() *blockingExporter { + return &blockingExporter{block: make(chan struct{})} +} + +func (e *blockingExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error { + select { + case <-e.block: + case <-ctx.Done(): + e.err = ctx.Err() + return ctx.Err() + } return nil } -func (e *blockingExporter) Shutdown(ctx context.Context) error { +func (e *blockingExporter) Shutdown(_ context.Context) error { e.close.Do(func() { close(e.block) }) @@ -511,12 +504,30 @@ func (e *blockingExporter) unblock() { } } +type matchingLogger struct { + // This can be an atomic.Bool when we drop 1.18 support + _found atomic.Value + regex *regexp.Regexp +} + +func (l *matchingLogger) Logger() logr.Logger { + l._found.Store(false) + return funcr.New(func(prefix, args string) { + if l.regex.MatchString(args) { + l._found.Store(true) + } + }, funcr.Options{Verbosity: 10}) +} + +func (l *matchingLogger) found() bool { + return l._found.Load().(bool) +} + func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { - tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 1}) - l := &logCounter{LogSink: tLog.GetSink()} - otel.SetLogger(logr.New(l)) + tLog := &matchingLogger{regex: regexp.MustCompile(`"msg"="dropped spans" "total"=\d+ "this_batch"=\d+`)} + otel.SetLogger(tLog.Logger()) - te := &blockingExporter{block: make(chan struct{})} + te := newBlockingExporter() tp := basicTracerProvider(t) option := testOption{ name: "default BatchSpanProcessorOptions", @@ -533,23 +544,16 @@ func TestBatchSpanProcessorLogsWarningOnNewDropSpans(t *testing.T) { tp.RegisterSpanProcessor(ssp) tr := tp.Tracer("BatchSpanProcessorWithOption") - logMatch := regexp.MustCompile(`dropped spans\[total \d+ this_batch \d+\]`) assert.Eventually(t, func() bool { generateSpan(t, tr, option) te.unblock() - reportedDropCount := 0 - for _, v := range l.logs { - match := logMatch.MatchString(v) - if match { - reportedDropCount++ - } - } - - return reportedDropCount >= 1 + return tLog.found() }, time.Second, time.Millisecond) - _ = te.Shutdown(context.Background()) + // We shut down the exporter to clear any blocked ExportSpans(). + // If we don't shut down the exporter first the SpanProcessor will block waiting for Exports to complete. + _ = te.Shutdown(context.Background()) _ = ssp.Shutdown(context.Background()) // Reset the global logger so other tests are not impacted