Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add env support for batch span processor #2515

Merged
merged 4 commits into from Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Support `OTEL_EXPORTER_ZIPKIN_ENDPOINT` env to specify zipkin collector endpoint (#2490)
- Log the configuration of TracerProviders, and Tracers for debugging. To enable use a logger with Verbosity (V level) >=1
- Added environment variables for: `OTEL_BSP_SCHEDULE_DELAY`, `OTEL_BSP_EXPORT_TIMEOUT`, `OTEL_BSP_MAX_QUEUE_SIZE` and `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` (#2515)
pellared marked this conversation as resolved.
Show resolved Hide resolved

### Changed

Expand Down
23 changes: 17 additions & 6 deletions sdk/trace/batch_span_processor.go
Expand Up @@ -29,8 +29,8 @@ import (
// Defaults for BatchSpanProcessorOptions.
const (
DefaultMaxQueueSize = 2048
DefaultBatchTimeout = 5000 * time.Millisecond
DefaultExportTimeout = 30000 * time.Millisecond
DefaultScheduleDelay = 5000
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
)

Expand Down Expand Up @@ -89,11 +89,22 @@ var _ SpanProcessor = (*batchSpanProcessor)(nil)
//
// If the exporter is nil, the span processor will preform no action.
func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
maxQueueSize := intEnvOr(EnvBatchSpanProcessorMaxQueueSize, DefaultMaxQueueSize)
maxExportBatchSize := intEnvOr(EnvBatchSpanProcessorMaxExportBatchSize, DefaultMaxExportBatchSize)

if maxExportBatchSize > maxQueueSize {
if DefaultMaxExportBatchSize > maxQueueSize {
maxExportBatchSize = maxQueueSize
} else {
maxExportBatchSize = DefaultMaxExportBatchSize
}
}

o := BatchSpanProcessorOptions{
BatchTimeout: DefaultBatchTimeout,
ExportTimeout: DefaultExportTimeout,
MaxQueueSize: DefaultMaxQueueSize,
MaxExportBatchSize: DefaultMaxExportBatchSize,
BatchTimeout: time.Duration(intEnvOr(EnvBatchSpanProcessorScheduleDelay, DefaultScheduleDelay)) * time.Millisecond,
ExportTimeout: time.Duration(intEnvOr(EnvBatchSpanProcessorExportTimeout, DefaultExportTimeout)) * time.Millisecond,
MaxQueueSize: maxQueueSize,
MaxExportBatchSize: maxExportBatchSize,
}
for _, opt := range options {
opt(&o)
Expand Down
83 changes: 83 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Expand Up @@ -19,10 +19,13 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"sync"
"testing"
"time"

ottest "go.opentelemetry.io/otel/internal/internaltest"

"github.com/go-logr/logr/funcr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -115,6 +118,7 @@ type testOption struct {
wantBatchCount int
genNumSpans int
parallel bool
envs map[string]string
}

func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
Expand Down Expand Up @@ -221,6 +225,85 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
}
}

func TestNewBatchSpanProcessorWithEnvOptions(t *testing.T) {
options := []testOption{
{
name: "BatchSpanProcessorEnvOptions - Basic",
wantNumSpans: 2053,
wantBatchCount: 1,
genNumSpans: 2053,
envs: map[string]string{
sdktrace.EnvBatchSpanProcessorMaxQueueSize: "5000",
sdktrace.EnvBatchSpanProcessorMaxExportBatchSize: "5000",
},
},
{
name: "BatchSpanProcessorEnvOptions - A lager max export batch size than queue size",
wantNumSpans: 2053,
wantBatchCount: 4,
genNumSpans: 2053,
envs: map[string]string{
sdktrace.EnvBatchSpanProcessorMaxQueueSize: "5000",
sdktrace.EnvBatchSpanProcessorMaxExportBatchSize: "10000",
},
},
{
name: "BatchSpanProcessorEnvOptions - A lage max export batch size with a small queue size",
wantNumSpans: 2053,
wantBatchCount: 42,
genNumSpans: 2053,
envs: map[string]string{
sdktrace.EnvBatchSpanProcessorMaxQueueSize: "50",
sdktrace.EnvBatchSpanProcessorMaxExportBatchSize: "10000",
},
},
}

envStore := ottest.NewEnvStore()
envStore.Record(sdktrace.EnvBatchSpanProcessorScheduleDelay)
envStore.Record(sdktrace.EnvBatchSpanProcessorExportTimeout)
envStore.Record(sdktrace.EnvBatchSpanProcessorMaxQueueSize)
envStore.Record(sdktrace.EnvBatchSpanProcessorMaxExportBatchSize)

defer func() {
require.NoError(t, envStore.Restore())
}()

for _, option := range options {
t.Run(option.name, func(t *testing.T) {
for k, v := range option.envs {
require.NoError(t, os.Setenv(k, v))
}

te := testBatchExporter{}
tp := basicTracerProvider(t)
ssp := createAndRegisterBatchSP(option, &te)
if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOptions")

generateSpan(t, option.parallel, tr, option)

tp.UnregisterSpanProcessor(ssp)

gotNumOfSpans := te.len()
if option.wantNumSpans > 0 && option.wantNumSpans != gotNumOfSpans {
t.Errorf("number of exported span: got %+v, want %+v\n",
gotNumOfSpans, option.wantNumSpans)
}

gotBatchCount := te.getBatchCount()
if option.wantBatchCount > 0 && gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}
})
}
}

type stuckExporter struct {
testBatchExporter
}
Expand Down
59 changes: 59 additions & 0 deletions sdk/trace/env.go
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
"os"
"strconv"

"go.opentelemetry.io/otel/internal/global"
)

// Environment variable names
const (
// EnvBatchSpanProcessorScheduleDelay
// Delay interval between two consecutive exports.
// i.e. 5000
EnvBatchSpanProcessorScheduleDelay = "OTEL_BSP_SCHEDULE_DELAY"
// EnvBatchSpanProcessorExportTimeout
// Maximum allowed time to export data.
// i.e. 3000
EnvBatchSpanProcessorExportTimeout = "OTEL_BSP_EXPORT_TIMEOUT"
// EnvBatchSpanProcessorMaxQueueSize
// Maximum queue size
// i.e. 2048
EnvBatchSpanProcessorMaxQueueSize = "OTEL_BSP_MAX_QUEUE_SIZE"
// EnvBatchSpanProcessorMaxExportBatchSize
// Maximum batch size
// Note: Must be less than or equal to EnvBatchSpanProcessorMaxQueueSize
// i.e. 512
EnvBatchSpanProcessorMaxExportBatchSize = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
)

// intEnvOr returns an env variable's numeric value if it is exists (and valid) or the default if not
func intEnvOr(key string, defaultValue int) int {
value, ok := os.LookupEnv(key)
if !ok {
return defaultValue
}

intValue, err := strconv.Atoi(value)
if err != nil {
global.Info("Got invalid value, number value expected.", key, value)
return defaultValue
}

return intValue
}
60 changes: 60 additions & 0 deletions sdk/trace/env_test.go
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ottest "go.opentelemetry.io/otel/internal/internaltest"
)

func TestIntEnvOr(t *testing.T) {
testCases := []struct {
name string
envValue string
defaultValue int
expectedValue int
}{
{
name: "IntEnvOrTest - Basic",
envValue: "2500",
defaultValue: 500,
expectedValue: 2500,
},
{
name: "IntEnvOrTest - Invalid Number",
envValue: "localhost",
defaultValue: 500,
expectedValue: 500,
},
}

envStore := ottest.NewEnvStore()
envStore.Record(EnvBatchSpanProcessorMaxQueueSize)
defer func() {
require.NoError(t, envStore.Restore())
}()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, os.Setenv(EnvBatchSpanProcessorMaxQueueSize, tc.envValue))
actualValue := intEnvOr(EnvBatchSpanProcessorMaxQueueSize, tc.defaultValue)
assert.Equal(t, tc.expectedValue, actualValue)
})
}
}