Skip to content

Commit

Permalink
sdk/metric: Reader factories return structs (#4244)
Browse files Browse the repository at this point in the history
  • Loading branch information
pellared committed Jun 22, 2023
1 parent ca2aa83 commit 6b262b4
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 28 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add `ManualReader` struct in `go.opentelemetry.io/otel/sdk/metric`. (#4244)
- Add `PeriodicReader` struct in `go.opentelemetry.io/otel/sdk/metric`. (#4244)

### Changed

- Starting from `v1.21.0` of semantic conventions, `go.opentelemetry.io/otel/semconv/{version}/httpconv` and `go.opentelemetry.io/otel/semconv/{version}/netconv` packages will no longer be published. (#4145)
- Log duplicate instrument conflict at a warning level instead of info in `go.opentelemetry.io/otel/sdk/metric`. (#4202)
- Return an error on the creation of new instruments if their name doesn't pass regexp validation. (#4210)
- `NewManualReader` in `go.opentelemetry.io/otel/sdk/metric` returns `*ManualReader` instead of `Reader`. (#4244)
- `NewPeriodicReader` in `go.opentelemetry.io/otel/sdk/metric` returns `*PeriodicReader` instead of `Reader`. (#4244)

## [1.16.0/0.39.0] 2023-05-18

Expand Down
24 changes: 12 additions & 12 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// manualReader is a simple Reader that allows an application to
// ManualReader is a simple Reader that allows an application to
// read metrics on demand.
type manualReader struct {
type ManualReader struct {
sdkProducer atomic.Value
shutdownOnce sync.Once

Expand All @@ -41,12 +41,12 @@ type manualReader struct {
}

// Compile time check the manualReader implements Reader and is comparable.
var _ = map[Reader]struct{}{&manualReader{}: {}}
var _ = map[Reader]struct{}{&ManualReader{}: {}}

// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader(opts ...ManualReaderOption) Reader {
func NewManualReader(opts ...ManualReaderOption) *ManualReader {
cfg := newManualReaderConfig(opts)
r := &manualReader{
r := &ManualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
Expand All @@ -56,7 +56,7 @@ func NewManualReader(opts ...ManualReaderOption) Reader {

// register stores the sdkProducer which enables the caller
// to read metrics from the SDK on demand.
func (mr *manualReader) register(p sdkProducer) {
func (mr *ManualReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
Expand All @@ -66,7 +66,7 @@ func (mr *manualReader) register(p sdkProducer) {

// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
func (mr *manualReader) RegisterProducer(p Producer) {
func (mr *ManualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.isShutdown {
Expand All @@ -80,22 +80,22 @@ func (mr *manualReader) RegisterProducer(p Producer) {
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality {
func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
}

// aggregation returns what Aggregation to use for kind.
func (mr *manualReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
func (mr *ManualReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
return mr.aggregationSelector(kind)
}

// ForceFlush is a no-op, it always returns nil.
func (mr *manualReader) ForceFlush(context.Context) error {
func (mr *ManualReader) ForceFlush(context.Context) error {
return nil
}

// Shutdown closes any connections and frees any resources used by the reader.
func (mr *manualReader) Shutdown(context.Context) error {
func (mr *ManualReader) Shutdown(context.Context) error {
err := ErrReaderShutdown
mr.shutdownOnce.Do(func() {
// Any future call to Collect will now return ErrReaderShutdown.
Expand All @@ -117,7 +117,7 @@ func (mr *manualReader) Shutdown(context.Context) error {
//
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
}
Expand Down
32 changes: 16 additions & 16 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ func WithInterval(d time.Duration) PeriodicReaderOption {
// The Collect method of the returned Reader continues to gather and return
// metric data to the user. It will not automatically send that data to the
// exporter. That is left to the user to accomplish.
func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reader {
func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader {
conf := newPeriodicReaderConfig(options)
ctx, cancel := context.WithCancel(context.Background())
r := &periodicReader{
r := &PeriodicReader{
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
Expand All @@ -135,9 +135,9 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
return r
}

// periodicReader is a Reader that continuously collects and exports metric
// PeriodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type periodicReader struct {
type PeriodicReader struct {
sdkProducer atomic.Value

mu sync.Mutex
Expand All @@ -156,14 +156,14 @@ type periodicReader struct {
}

// Compile time check the periodicReader implements Reader and is comparable.
var _ = map[Reader]struct{}{&periodicReader{}: {}}
var _ = map[Reader]struct{}{&PeriodicReader{}: {}}

// newTicker allows testing override.
var newTicker = time.NewTicker

// run continuously collects and exports metric data at the specified
// interval. This will run until ctx is canceled or times out.
func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
func (r *PeriodicReader) run(ctx context.Context, interval time.Duration) {
ticker := newTicker(interval)
defer ticker.Stop()

Expand All @@ -184,7 +184,7 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
}

// register registers p as the producer of this reader.
func (r *periodicReader) register(p sdkProducer) {
func (r *PeriodicReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
Expand All @@ -193,7 +193,7 @@ func (r *periodicReader) register(p sdkProducer) {
}

// RegisterProducer registers p as an external Producer of this reader.
func (r *periodicReader) RegisterProducer(p Producer) {
func (r *PeriodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
if r.isShutdown {
Expand All @@ -207,18 +207,18 @@ func (r *periodicReader) RegisterProducer(p Producer) {
}

// temporality reports the Temporality for the instrument kind provided.
func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
}

// aggregation returns what Aggregation to use for kind.
func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
func (r *PeriodicReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
return r.exporter.Aggregation(kind)
}

// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
err := r.Collect(ctx, rm)
Expand All @@ -235,7 +235,7 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
// handle that if desired.
//
// An error is returned if this is called after Shutdown. An error is return if rm is nil.
func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
Expand All @@ -244,7 +244,7 @@ func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet
}

// collect unwraps p as a produceHolder and returns its produce results.
func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
if p == nil {
return ErrReaderNotRegistered
}
Expand Down Expand Up @@ -275,14 +275,14 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricd
}

// export exports metric data m using r's exporter.
func (r *periodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
c, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
return r.exporter.Export(c, m)
}

// ForceFlush flushes pending telemetry.
func (r *periodicReader) ForceFlush(ctx context.Context) error {
func (r *PeriodicReader) ForceFlush(ctx context.Context) error {
errCh := make(chan error, 1)
select {
case r.flushCh <- errCh:
Expand All @@ -304,7 +304,7 @@ func (r *periodicReader) ForceFlush(ctx context.Context) error {
}

// Shutdown flushes pending telemetry and then stops the export pipeline.
func (r *periodicReader) Shutdown(ctx context.Context) error {
func (r *PeriodicReader) Shutdown(ctx context.Context) error {
err := ErrReaderShutdown
r.shutdownOnce.Do(func() {
// Stop the run loop.
Expand Down

0 comments on commit 6b262b4

Please sign in to comment.