Skip to content

Commit

Permalink
Add env support for batch span processor (#2515)
Browse files Browse the repository at this point in the history
* Add env support for batch span processor

* Update changelog

* lint
  • Loading branch information
sincejune committed Jan 28, 2022
1 parent d3bb038 commit 3cf35bd
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Support `OTEL_EXPORTER_ZIPKIN_ENDPOINT` env to specify zipkin collector endpoint (#2490)
- Log the configuration of TracerProviders, and Tracers for debugging. To enable use a logger with Verbosity (V level) >=1
- Added environment variables for: `OTEL_BSP_SCHEDULE_DELAY`, `OTEL_BSP_EXPORT_TIMEOUT`, `OTEL_BSP_MAX_QUEUE_SIZE` and `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` (#2515)

### Changed

Expand Down
23 changes: 17 additions & 6 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -29,8 +29,8 @@ import (
// Defaults for BatchSpanProcessorOptions.
const (
DefaultMaxQueueSize = 2048
DefaultBatchTimeout = 5000 * time.Millisecond
DefaultExportTimeout = 30000 * time.Millisecond
DefaultScheduleDelay = 5000
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
)

Expand Down Expand Up @@ -89,11 +89,22 @@ var _ SpanProcessor = (*batchSpanProcessor)(nil)
//
// If the exporter is nil, the span processor will preform no action.
func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
maxQueueSize := intEnvOr(EnvBatchSpanProcessorMaxQueueSize, DefaultMaxQueueSize)
maxExportBatchSize := intEnvOr(EnvBatchSpanProcessorMaxExportBatchSize, DefaultMaxExportBatchSize)

if maxExportBatchSize > maxQueueSize {
if DefaultMaxExportBatchSize > maxQueueSize {
maxExportBatchSize = maxQueueSize
} else {
maxExportBatchSize = DefaultMaxExportBatchSize
}
}

o := BatchSpanProcessorOptions{
BatchTimeout: DefaultBatchTimeout,
ExportTimeout: DefaultExportTimeout,
MaxQueueSize: DefaultMaxQueueSize,
MaxExportBatchSize: DefaultMaxExportBatchSize,
BatchTimeout: time.Duration(intEnvOr(EnvBatchSpanProcessorScheduleDelay, DefaultScheduleDelay)) * time.Millisecond,
ExportTimeout: time.Duration(intEnvOr(EnvBatchSpanProcessorExportTimeout, DefaultExportTimeout)) * time.Millisecond,
MaxQueueSize: maxQueueSize,
MaxExportBatchSize: maxExportBatchSize,
}
for _, opt := range options {
opt(&o)
Expand Down
83 changes: 83 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Expand Up @@ -19,10 +19,13 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"sync"
"testing"
"time"

ottest "go.opentelemetry.io/otel/internal/internaltest"

"github.com/go-logr/logr/funcr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -115,6 +118,7 @@ type testOption struct {
wantBatchCount int
genNumSpans int
parallel bool
envs map[string]string
}

func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
Expand Down Expand Up @@ -221,6 +225,85 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
}
}

func TestNewBatchSpanProcessorWithEnvOptions(t *testing.T) {
options := []testOption{
{
name: "BatchSpanProcessorEnvOptions - Basic",
wantNumSpans: 2053,
wantBatchCount: 1,
genNumSpans: 2053,
envs: map[string]string{
sdktrace.EnvBatchSpanProcessorMaxQueueSize: "5000",
sdktrace.EnvBatchSpanProcessorMaxExportBatchSize: "5000",
},
},
{
name: "BatchSpanProcessorEnvOptions - A lager max export batch size than queue size",
wantNumSpans: 2053,
wantBatchCount: 4,
genNumSpans: 2053,
envs: map[string]string{
sdktrace.EnvBatchSpanProcessorMaxQueueSize: "5000",
sdktrace.EnvBatchSpanProcessorMaxExportBatchSize: "10000",
},
},
{
name: "BatchSpanProcessorEnvOptions - A lage max export batch size with a small queue size",
wantNumSpans: 2053,
wantBatchCount: 42,
genNumSpans: 2053,
envs: map[string]string{
sdktrace.EnvBatchSpanProcessorMaxQueueSize: "50",
sdktrace.EnvBatchSpanProcessorMaxExportBatchSize: "10000",
},
},
}

envStore := ottest.NewEnvStore()
envStore.Record(sdktrace.EnvBatchSpanProcessorScheduleDelay)
envStore.Record(sdktrace.EnvBatchSpanProcessorExportTimeout)
envStore.Record(sdktrace.EnvBatchSpanProcessorMaxQueueSize)
envStore.Record(sdktrace.EnvBatchSpanProcessorMaxExportBatchSize)

defer func() {
require.NoError(t, envStore.Restore())
}()

for _, option := range options {
t.Run(option.name, func(t *testing.T) {
for k, v := range option.envs {
require.NoError(t, os.Setenv(k, v))
}

te := testBatchExporter{}
tp := basicTracerProvider(t)
ssp := createAndRegisterBatchSP(option, &te)
if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOptions")

generateSpan(t, option.parallel, tr, option)

tp.UnregisterSpanProcessor(ssp)

gotNumOfSpans := te.len()
if option.wantNumSpans > 0 && option.wantNumSpans != gotNumOfSpans {
t.Errorf("number of exported span: got %+v, want %+v\n",
gotNumOfSpans, option.wantNumSpans)
}

gotBatchCount := te.getBatchCount()
if option.wantBatchCount > 0 && gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}
})
}
}

type stuckExporter struct {
testBatchExporter
}
Expand Down
59 changes: 59 additions & 0 deletions sdk/trace/env.go
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
"os"
"strconv"

"go.opentelemetry.io/otel/internal/global"
)

// Environment variable names
const (
// EnvBatchSpanProcessorScheduleDelay
// Delay interval between two consecutive exports.
// i.e. 5000
EnvBatchSpanProcessorScheduleDelay = "OTEL_BSP_SCHEDULE_DELAY"
// EnvBatchSpanProcessorExportTimeout
// Maximum allowed time to export data.
// i.e. 3000
EnvBatchSpanProcessorExportTimeout = "OTEL_BSP_EXPORT_TIMEOUT"
// EnvBatchSpanProcessorMaxQueueSize
// Maximum queue size
// i.e. 2048
EnvBatchSpanProcessorMaxQueueSize = "OTEL_BSP_MAX_QUEUE_SIZE"
// EnvBatchSpanProcessorMaxExportBatchSize
// Maximum batch size
// Note: Must be less than or equal to EnvBatchSpanProcessorMaxQueueSize
// i.e. 512
EnvBatchSpanProcessorMaxExportBatchSize = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
)

// intEnvOr returns an env variable's numeric value if it is exists (and valid) or the default if not
func intEnvOr(key string, defaultValue int) int {
value, ok := os.LookupEnv(key)
if !ok {
return defaultValue
}

intValue, err := strconv.Atoi(value)
if err != nil {
global.Info("Got invalid value, number value expected.", key, value)
return defaultValue
}

return intValue
}
60 changes: 60 additions & 0 deletions sdk/trace/env_test.go
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ottest "go.opentelemetry.io/otel/internal/internaltest"
)

func TestIntEnvOr(t *testing.T) {
testCases := []struct {
name string
envValue string
defaultValue int
expectedValue int
}{
{
name: "IntEnvOrTest - Basic",
envValue: "2500",
defaultValue: 500,
expectedValue: 2500,
},
{
name: "IntEnvOrTest - Invalid Number",
envValue: "localhost",
defaultValue: 500,
expectedValue: 500,
},
}

envStore := ottest.NewEnvStore()
envStore.Record(EnvBatchSpanProcessorMaxQueueSize)
defer func() {
require.NoError(t, envStore.Restore())
}()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, os.Setenv(EnvBatchSpanProcessorMaxQueueSize, tc.envValue))
actualValue := intEnvOr(EnvBatchSpanProcessorMaxQueueSize, tc.defaultValue)
assert.Equal(t, tc.expectedValue, actualValue)
})
}
}

0 comments on commit 3cf35bd

Please sign in to comment.