Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk/metric: Remove Reader.ForceFlush and ManualReader.ForceFlush #4375

Merged
merged 12 commits into from
Aug 11, 2023
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `PeriodicReader.Shutdown` and `PeriodicReader.ForceFlush` in `go.opentelemetry.io/otel/sdk/metric` now apply the periodic reader's timeout to the operation if the user provided context does not contain a deadline. (#4356, #4377)
- Upgrade all use of `go.opentelemetry.io/otel/semconv` to use `v1.21.0`. (#4408)

### Removed

- Remove `Reader.ForceFlush` in `go.opentelemetry.io/otel/metric`. Notice that `PeriodicReader.ForceFlush` is still available. (#4375)
pellared marked this conversation as resolved.
Show resolved Hide resolved

### Fixed

- Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143)
Expand Down
4 changes: 3 additions & 1 deletion sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func (c config) readerSignals() (forceFlush, shutdown func(context.Context) erro
var fFuncs, sFuncs []func(context.Context) error
for _, r := range c.readers {
sFuncs = append(sFuncs, r.Shutdown)
fFuncs = append(fFuncs, r.ForceFlush)
if f, ok := r.(interface{ ForceFlush(context.Context) error }); ok {
fFuncs = append(fFuncs, f.ForceFlush)
}
pellared marked this conversation as resolved.
Show resolved Hide resolved
}

return unify(fFuncs), unifyShutdown(sFuncs)
Expand Down
7 changes: 0 additions & 7 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ func (mr *ManualReader) aggregation(kind InstrumentKind) aggregation.Aggregation
return mr.aggregationSelector(kind)
}

// ForceFlush is a no-op, it always returns nil.
//
// This method is safe to call concurrently.
func (mr *ManualReader) ForceFlush(context.Context) error {
return nil
}

// Shutdown closes any connections and frees any resources used by the reader.
//
// This method is safe to call concurrently.
Expand Down
12 changes: 11 additions & 1 deletion sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -198,7 +199,7 @@ func (e *fnExporter) Shutdown(ctx context.Context) error {
type periodicReaderTestSuite struct {
*readerTestSuite

ErrReader Reader
ErrReader *PeriodicReader
}

func (ts *periodicReaderTestSuite) SetupTest() {
Expand Down Expand Up @@ -425,6 +426,15 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
})
}

func TestPeriodicReaderMultipleForceFlush(t *testing.T) {
ctx := context.Background()
r := NewPeriodicReader(new(fnExporter))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
require.NoError(t, r.ForceFlush(ctx))
require.NoError(t, r.ForceFlush(ctx))
}

func BenchmarkPeriodicReader(b *testing.B) {
b.Run("Collect", benchReaderCollectFunc(
NewPeriodicReader(new(fnExporter)),
Expand Down
10 changes: 0 additions & 10 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,6 @@ type Reader interface {
// passed context is expected to be honored.
Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error

// ForceFlush flushes all metric measurements held in an export pipeline.
//
// This deadline or cancellation of the passed context are honored. An appropriate
// error will be returned in these situations. There is no guaranteed that all
// telemetry be flushed or all resources have been released in these
// situations.
//
// This method needs to be concurrent safe.
ForceFlush(context.Context) error

// Shutdown flushes all metric measurements held in an export pipeline and releases any
// held computational resources.
//
Expand Down
20 changes: 7 additions & 13 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ func (ts *readerTestSuite) TestShutdownTwice() {
ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown)
}

func (ts *readerTestSuite) TestMultipleForceFlush() {
ctx := context.Background()
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
ts.Require().NoError(ts.Reader.ForceFlush(ctx))
ts.NoError(ts.Reader.ForceFlush(ctx))
}

func (ts *readerTestSuite) TestMultipleRegister() {
p0 := testSDKProducer{
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
Expand Down Expand Up @@ -186,11 +178,13 @@ func (ts *readerTestSuite) TestMethodConcurrentSafe() {
_ = ts.Reader.Collect(ctx, nil)
}()

wg.Add(1)
go func() {
defer wg.Done()
_ = ts.Reader.ForceFlush(ctx)
}()
if f, ok := ts.Reader.(interface{ ForceFlush(context.Context) error }); ok {
wg.Add(1)
go func() {
defer wg.Done()
_ = f.ForceFlush(ctx)
}()
}

wg.Add(1)
go func() {
Expand Down