Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics: Rename sdk/export/metric.ExportKind to aggregation.Temporality #2274

Merged
merged 13 commits into from Oct 15, 2021
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## Changed

- Skip links with invalid span context. (#2275)
- Metric SDK `export.ExportKind`, `export.ExportKindSelector` types have been renamed to `aggregation.Temporality` and `aggregation.TemporalitySelector` respectively to keep in line with current specification and protocol along with built-in selectors (e.g., `aggregation.CumulativeTemporalitySelector`, ...). (#2274)
- The Metric `Exporter` interface now requires a `TemporalitySelector` method instead of an `ExportKindSelector`. (#2274)
- Metrics API cleanup. The `metric/sdkapi` package has been created to relocate the API-to-SDK interface:
- The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner`
- The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`.
Expand Down
2 changes: 1 addition & 1 deletion bridge/opencensus/exporter.go
Expand Up @@ -79,7 +79,7 @@ var _ export.Reader = &metricReader{}

// ForEach iterates through the metrics data, synthesizing an
// export.Record with the appropriate aggregation for the exporter.
func (d *metricReader) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error {
func (d *metricReader) ForEach(_ aggregation.TemporalitySelector, f func(export.Record) error) error {
for _, m := range d.metrics {
descriptor, err := convertDescriptor(m.Descriptor)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion bridge/opencensus/exporter_test.go
Expand Up @@ -48,7 +48,7 @@ type fakeExporter struct {
}

func (f *fakeExporter) Export(ctx context.Context, res *resource.Resource, ilr exportmetric.InstrumentationLibraryReader) error {
return controllertest.ReadAll(ilr, export.StatelessExportKindSelector(),
return controllertest.ReadAll(ilr, aggregation.StatelessTemporalitySelector(),
func(_ instrumentation.Library, record exportmetric.Record) error {
f.resource = res
f.records = append(f.records, record)
Expand Down
4 changes: 2 additions & 2 deletions example/prometheus/main.go
Expand Up @@ -26,7 +26,7 @@ import (
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
Expand All @@ -44,7 +44,7 @@ func initMeter() {
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries),
),
export.CumulativeExportKindSelector(),
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
)
Expand Down
16 changes: 8 additions & 8 deletions exporters/otlp/otlpmetric/exporter.go
Expand Up @@ -33,8 +33,8 @@ var (

// Exporter exports metrics data in the OTLP wire format.
type Exporter struct {
client Client
exportKindSelector metricsdk.ExportKindSelector
client Client
temporalitySelector aggregation.TemporalitySelector

mu sync.RWMutex
started bool
Expand Down Expand Up @@ -96,8 +96,8 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
return err
}

func (e *Exporter) ExportKindFor(descriptor *sdkapi.Descriptor, aggregatorKind aggregation.Kind) metricsdk.ExportKind {
return e.exportKindSelector.ExportKindFor(descriptor, aggregatorKind)
func (e *Exporter) TemporalityFor(descriptor *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality {
return e.temporalitySelector.TemporalityFor(descriptor, kind)
}

var _ metricsdk.Exporter = (*Exporter)(nil)
Expand All @@ -114,19 +114,19 @@ func New(ctx context.Context, client Client, opts ...Option) (*Exporter, error)
// NewUnstarted constructs a new Exporter and does not start it.
func NewUnstarted(client Client, opts ...Option) *Exporter {
cfg := config{
// Note: the default ExportKindSelector is specified
// Note: the default TemporalitySelector is specified
// as Cumulative:
// https://github.com/open-telemetry/opentelemetry-specification/issues/731
exportKindSelector: metricsdk.CumulativeExportKindSelector(),
temporalitySelector: aggregation.CumulativeTemporalitySelector(),
}

for _, opt := range opts {
opt.apply(&cfg)
}

e := &Exporter{
client: client,
exportKindSelector: cfg.exportKindSelector,
client: client,
temporalitySelector: cfg.temporalitySelector,
}

return e
Expand Down
7 changes: 4 additions & 3 deletions exporters/otlp/otlpmetric/exporter_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
Expand Down Expand Up @@ -606,7 +607,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
)
}

func TestStatelessExportKind(t *testing.T) {
func TestStatelessAggregationTemporality(t *testing.T) {
type testcase struct {
name string
instrumentKind sdkapi.InstrumentKind
Expand All @@ -624,8 +625,8 @@ func TestStatelessExportKind(t *testing.T) {
runMetricExportTests(
t,
[]otlpmetric.Option{
otlpmetric.WithMetricExportKindSelector(
metricsdk.StatelessExportKindSelector(),
otlpmetric.WithMetricAggregationTemporalitySelector(
aggregation.StatelessTemporalitySelector(),
),
},
testerAResource,
Expand Down
38 changes: 19 additions & 19 deletions exporters/otlp/otlpmetric/internal/metrictransform/metric.go
Expand Up @@ -72,12 +72,12 @@ func toNanos(t time.Time) uint64 {

// InstrumentationLibraryReader transforms all records contained in a checkpoint into
// batched OTLP ResourceMetrics.
func InstrumentationLibraryReader(ctx context.Context, exportSelector export.ExportKindSelector, res *resource.Resource, ilmr export.InstrumentationLibraryReader, numWorkers uint) (*metricpb.ResourceMetrics, error) {
func InstrumentationLibraryReader(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, res *resource.Resource, ilmr export.InstrumentationLibraryReader, numWorkers uint) (*metricpb.ResourceMetrics, error) {
var ilms []*metricpb.InstrumentationLibraryMetrics

err := ilmr.ForEach(func(lib instrumentation.Library, mr export.Reader) error {

records, errc := source(ctx, exportSelector, mr)
records, errc := source(ctx, temporalitySelector, mr)

// Start a fixed number of goroutines to transform records.
transformed := make(chan result)
Expand All @@ -86,7 +86,7 @@ func InstrumentationLibraryReader(ctx context.Context, exportSelector export.Exp
for i := uint(0); i < numWorkers; i++ {
go func() {
defer wg.Done()
transformer(ctx, exportSelector, records, transformed)
transformer(ctx, temporalitySelector, records, transformed)
}()
}
go func() {
Expand Down Expand Up @@ -134,14 +134,14 @@ func InstrumentationLibraryReader(ctx context.Context, exportSelector export.Exp
// source starts a goroutine that sends each one of the Records yielded by
// the Reader on the returned chan. Any error encountered will be sent
// on the returned error chan after seeding is complete.
func source(ctx context.Context, exportSelector export.ExportKindSelector, mr export.Reader) (<-chan export.Record, <-chan error) {
func source(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, mr export.Reader) (<-chan export.Record, <-chan error) {
errc := make(chan error, 1)
out := make(chan export.Record)
// Seed records into process.
go func() {
defer close(out)
// No select is needed since errc is buffered.
errc <- mr.ForEach(exportSelector, func(r export.Record) error {
errc <- mr.ForEach(temporalitySelector, func(r export.Record) error {
select {
case <-ctx.Done():
return ErrContextCanceled
Expand All @@ -155,9 +155,9 @@ func source(ctx context.Context, exportSelector export.ExportKindSelector, mr ex

// transformer transforms records read from the passed in chan into
// OTLP Metrics which are sent on the out chan.
func transformer(ctx context.Context, exportSelector export.ExportKindSelector, in <-chan export.Record, out chan<- result) {
func transformer(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, in <-chan export.Record, out chan<- result) {
for r := range in {
m, err := Record(exportSelector, r)
m, err := Record(temporalitySelector, r)
// Propagate errors, but do not send empty results.
if err == nil && m == nil {
continue
Expand Down Expand Up @@ -237,7 +237,7 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.Metric, error) {

// Record transforms a Record into an OTLP Metric. An ErrIncompatibleAgg
// error is returned if the Record Aggregator is not supported.
func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricpb.Metric, error) {
func Record(temporalitySelector aggregation.TemporalitySelector, r export.Record) (*metricpb.Metric, error) {
agg := r.Aggregation()
switch agg.Kind() {
case aggregation.MinMaxSumCountKind:
Expand All @@ -252,7 +252,7 @@ func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricp
if !ok {
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg)
}
return histogramPoint(r, exportSelector.ExportKindFor(r.Descriptor(), aggregation.HistogramKind), h)
return histogramPoint(r, temporalitySelector.TemporalityFor(r.Descriptor(), aggregation.HistogramKind), h)

case aggregation.SumKind:
s, ok := agg.(aggregation.Sum)
Expand All @@ -263,7 +263,7 @@ func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricp
if err != nil {
return nil, err
}
return sumPoint(r, sum, r.StartTime(), r.EndTime(), exportSelector.ExportKindFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic())
return sumPoint(r, sum, r.StartTime(), r.EndTime(), temporalitySelector.TemporalityFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic())

case aggregation.LastValueKind:
lv, ok := agg.(aggregation.LastValue)
Expand Down Expand Up @@ -388,17 +388,17 @@ func gaugePoint(record export.Record, num number.Number, start, end time.Time) (
return m, nil
}

func exportKindToTemporality(ek export.ExportKind) metricpb.AggregationTemporality {
switch ek {
case export.DeltaExportKind:
func sdkTemporalityToTemporality(temporality aggregation.Temporality) metricpb.AggregationTemporality {
switch temporality {
case aggregation.DeltaTemporality:
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA
case export.CumulativeExportKind:
case aggregation.CumulativeTemporality:
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE
}
return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED
}

func sumPoint(record export.Record, num number.Number, start, end time.Time, ek export.ExportKind, monotonic bool) (*metricpb.Metric, error) {
func sumPoint(record export.Record, num number.Number, start, end time.Time, temporality aggregation.Temporality, monotonic bool) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()

Expand All @@ -413,7 +413,7 @@ func sumPoint(record export.Record, num number.Number, start, end time.Time, ek
m.Data = &metricpb.Metric_Sum{
Sum: &metricpb.Sum{
IsMonotonic: monotonic,
AggregationTemporality: exportKindToTemporality(ek),
AggregationTemporality: sdkTemporalityToTemporality(temporality),
DataPoints: []*metricpb.NumberDataPoint{
{
Value: &metricpb.NumberDataPoint_AsInt{
Expand All @@ -430,7 +430,7 @@ func sumPoint(record export.Record, num number.Number, start, end time.Time, ek
m.Data = &metricpb.Metric_Sum{
Sum: &metricpb.Sum{
IsMonotonic: monotonic,
AggregationTemporality: exportKindToTemporality(ek),
AggregationTemporality: sdkTemporalityToTemporality(temporality),
DataPoints: []*metricpb.NumberDataPoint{
{
Value: &metricpb.NumberDataPoint_AsDouble{
Expand Down Expand Up @@ -522,7 +522,7 @@ func histogramValues(a aggregation.Histogram) (boundaries []float64, counts []ui
}

// histogram transforms a Histogram Aggregator into an OTLP Metric.
func histogramPoint(record export.Record, ek export.ExportKind, a aggregation.Histogram) (*metricpb.Metric, error) {
func histogramPoint(record export.Record, temporality aggregation.Temporality, a aggregation.Histogram) (*metricpb.Metric, error) {
desc := record.Descriptor()
labels := record.Labels()
boundaries, counts, err := histogramValues(a)
Expand All @@ -546,7 +546,7 @@ func histogramPoint(record export.Record, ek export.ExportKind, a aggregation.Hi
Unit: string(desc.Unit()),
Data: &metricpb.Metric_Histogram{
Histogram: &metricpb.Histogram{
AggregationTemporality: exportKindToTemporality(ek),
AggregationTemporality: sdkTemporalityToTemporality(temporality),
DataPoints: []*metricpb.HistogramDataPoint{
{
Sum: sum.CoerceToFloat64(desc.NumberKind()),
Expand Down
Expand Up @@ -190,7 +190,7 @@ func TestSumIntDataPoints(t *testing.T) {
value, err := ckpt.Sum()
require.NoError(t, err)

if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true); assert.NoError(t, err) {
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.CumulativeTemporality, true); assert.NoError(t, err) {
assert.Nil(t, m.GetGauge())
assert.Equal(t, &metricpb.Sum{
AggregationTemporality: otelCumulative,
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestSumFloatDataPoints(t *testing.T) {
value, err := ckpt.Sum()
require.NoError(t, err)

if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.DeltaExportKind, false); assert.NoError(t, err) {
if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.DeltaTemporality, false); assert.NoError(t, err) {
assert.Nil(t, m.GetGauge())
assert.Equal(t, &metricpb.Sum{
IsMonotonic: false,
Expand Down Expand Up @@ -367,7 +367,7 @@ func TestSumErrUnknownValueType(t *testing.T) {
value, err := s.Sum()
require.NoError(t, err)

_, err = sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true)
_, err = sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.CumulativeTemporality, true)
assert.Error(t, err)
if !errors.Is(err, ErrUnknownValueType) {
t.Errorf("expected ErrUnknownValueType, got %v", err)
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) {
kind: kind,
agg: agg,
}
return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, test, intervalStart, intervalEnd))
return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, test, intervalStart, intervalEnd))
}

mpb, err := makeMpb(aggregation.SumKind, &lastvalue.New(1)[0])
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestRecordAggregatorUnexpectedErrors(t *testing.T) {
makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) {
desc := metrictest.NewDescriptor("things", sdkapi.CounterInstrumentKind, number.Int64Kind)
labels := attribute.NewSet()
return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, agg, intervalStart, intervalEnd))
return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, agg, intervalStart, intervalEnd))
}

errEx := fmt.Errorf("timeout")
Expand Down
Expand Up @@ -30,7 +30,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
exportmetric "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
Expand All @@ -40,7 +40,7 @@ import (
// themselves.
func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter, mcMetrics Collector) {
selector := simple.NewWithInexpensiveDistribution()
proc := processor.NewFactory(selector, exportmetric.StatelessExportKindSelector())
proc := processor.NewFactory(selector, aggregation.StatelessTemporalitySelector())
cont := controller.New(proc, controller.WithExporter(exp))
require.NoError(t, cont.Start(ctx))

Expand Down
14 changes: 7 additions & 7 deletions exporters/otlp/otlpmetric/options.go
Expand Up @@ -14,7 +14,7 @@

package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric"

import metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
import "go.opentelemetry.io/otel/sdk/export/metric/aggregation"

// Option are setting options passed to an Exporter on creation.
type Option interface {
Expand All @@ -28,15 +28,15 @@ func (fn exporterOptionFunc) apply(cfg *config) {
}

type config struct {
exportKindSelector metricsdk.ExportKindSelector
temporalitySelector aggregation.TemporalitySelector
}

// WithMetricExportKindSelector defines the ExportKindSelector used
// for selecting AggregationTemporality (i.e., Cumulative vs. Delta
// WithMetricAggregationTemporalitySelector defines the aggregation.TemporalitySelector used
// for selecting aggregation.Temporality (i.e., Cumulative vs. Delta
// aggregation). If not specified otherwise, exporter will use a
// cumulative export kind selector.
func WithMetricExportKindSelector(selector metricsdk.ExportKindSelector) Option {
// cumulative temporality selector.
func WithMetricAggregationTemporalitySelector(selector aggregation.TemporalitySelector) Option {
return exporterOptionFunc(func(cfg *config) {
cfg.exportKindSelector = selector
cfg.temporalitySelector = selector
})
}
6 changes: 3 additions & 3 deletions exporters/prometheus/prometheus.go
Expand Up @@ -132,9 +132,9 @@ func (e *Exporter) Controller() *controller.Controller {
return e.controller
}

// ExportKindFor implements ExportKindSelector.
func (e *Exporter) ExportKindFor(desc *sdkapi.Descriptor, kind aggregation.Kind) export.ExportKind {
return export.CumulativeExportKindSelector().ExportKindFor(desc, kind)
// TemporalityFor implements TemporalitySelector.
func (e *Exporter) TemporalityFor(desc *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality {
return aggregation.CumulativeTemporalitySelector().TemporalityFor(desc, kind)
}

// ServeHTTP implements http.Handler.
Expand Down
4 changes: 2 additions & 2 deletions exporters/prometheus/prometheus_test.go
Expand Up @@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
Expand Down Expand Up @@ -88,7 +88,7 @@ func newPipeline(config prometheus.Config, options ...controller.Option) (*prome
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries),
),
export.CumulativeExportKindSelector(),
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
options...,
Expand Down
4 changes: 2 additions & 2 deletions exporters/stdout/stdoutmetric/metric.go
Expand Up @@ -47,8 +47,8 @@ type line struct {
Timestamp *time.Time `json:"Timestamp,omitempty"`
}

func (e *metricExporter) ExportKindFor(desc *sdkapi.Descriptor, kind aggregation.Kind) exportmetric.ExportKind {
return exportmetric.StatelessExportKindSelector().ExportKindFor(desc, kind)
func (e *metricExporter) TemporalityFor(desc *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality {
return aggregation.StatelessTemporalitySelector().TemporalityFor(desc, kind)
}

func (e *metricExporter) Export(_ context.Context, res *resource.Resource, reader exportmetric.InstrumentationLibraryReader) error {
Expand Down