diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fcfa23c942..679a0b0e96b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Changed - Skip links with invalid span context. (#2275) +- Metric SDK `export.ExportKind`, `export.ExportKindSelector` types have been renamed to `aggregation.Temporality` and `aggregation.TemporalitySelector` respectively to keep in line with current specification and protocol along with built-in selectors (e.g., `aggregation.CumulativeTemporalitySelector`, ...). (#2274) +- The Metric `Exporter` interface now requires a `TemporalitySelector` method instead of an `ExportKindSelector`. (#2274) - Metrics API cleanup. The `metric/sdkapi` package has been created to relocate the API-to-SDK interface: - The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner` - The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`. diff --git a/bridge/opencensus/exporter.go b/bridge/opencensus/exporter.go index 1bc4d5fc115..d52494367d1 100644 --- a/bridge/opencensus/exporter.go +++ b/bridge/opencensus/exporter.go @@ -79,7 +79,7 @@ var _ export.Reader = &metricReader{} // ForEach iterates through the metrics data, synthesizing an // export.Record with the appropriate aggregation for the exporter. -func (d *metricReader) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error { +func (d *metricReader) ForEach(_ aggregation.TemporalitySelector, f func(export.Record) error) error { for _, m := range d.metrics { descriptor, err := convertDescriptor(m.Descriptor) if err != nil { diff --git a/bridge/opencensus/exporter_test.go b/bridge/opencensus/exporter_test.go index 3c0d89db8a5..710369ba5bb 100644 --- a/bridge/opencensus/exporter_test.go +++ b/bridge/opencensus/exporter_test.go @@ -48,7 +48,7 @@ type fakeExporter struct { } func (f *fakeExporter) Export(ctx context.Context, res *resource.Resource, ilr exportmetric.InstrumentationLibraryReader) error { - return controllertest.ReadAll(ilr, export.StatelessExportKindSelector(), + return controllertest.ReadAll(ilr, aggregation.StatelessTemporalitySelector(), func(_ instrumentation.Library, record exportmetric.Record) error { f.resource = res f.records = append(f.records, record) diff --git a/example/prometheus/main.go b/example/prometheus/main.go index 2ca4eb87443..968beb3d483 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -26,7 +26,7 @@ import ( "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/global" - export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" @@ -44,7 +44,7 @@ func initMeter() { selector.NewWithHistogramDistribution( histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries), ), - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), processor.WithMemory(true), ), ) diff --git a/exporters/otlp/otlpmetric/exporter.go b/exporters/otlp/otlpmetric/exporter.go index cc3e0658842..798b690be01 100644 --- a/exporters/otlp/otlpmetric/exporter.go +++ b/exporters/otlp/otlpmetric/exporter.go @@ -33,8 +33,8 @@ var ( // Exporter exports metrics data in the OTLP wire format. type Exporter struct { - client Client - exportKindSelector metricsdk.ExportKindSelector + client Client + temporalitySelector aggregation.TemporalitySelector mu sync.RWMutex started bool @@ -96,8 +96,8 @@ func (e *Exporter) Shutdown(ctx context.Context) error { return err } -func (e *Exporter) ExportKindFor(descriptor *sdkapi.Descriptor, aggregatorKind aggregation.Kind) metricsdk.ExportKind { - return e.exportKindSelector.ExportKindFor(descriptor, aggregatorKind) +func (e *Exporter) TemporalityFor(descriptor *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality { + return e.temporalitySelector.TemporalityFor(descriptor, kind) } var _ metricsdk.Exporter = (*Exporter)(nil) @@ -114,10 +114,10 @@ func New(ctx context.Context, client Client, opts ...Option) (*Exporter, error) // NewUnstarted constructs a new Exporter and does not start it. func NewUnstarted(client Client, opts ...Option) *Exporter { cfg := config{ - // Note: the default ExportKindSelector is specified + // Note: the default TemporalitySelector is specified // as Cumulative: // https://github.com/open-telemetry/opentelemetry-specification/issues/731 - exportKindSelector: metricsdk.CumulativeExportKindSelector(), + temporalitySelector: aggregation.CumulativeTemporalitySelector(), } for _, opt := range opts { @@ -125,8 +125,8 @@ func NewUnstarted(client Client, opts ...Option) *Exporter { } e := &Exporter{ - client: client, - exportKindSelector: cfg.exportKindSelector, + client: client, + temporalitySelector: cfg.temporalitySelector, } return e diff --git a/exporters/otlp/otlpmetric/exporter_test.go b/exporters/otlp/otlpmetric/exporter_test.go index f329627d0c4..733d1978826 100644 --- a/exporters/otlp/otlpmetric/exporter_test.go +++ b/exporters/otlp/otlpmetric/exporter_test.go @@ -33,6 +33,7 @@ import ( "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" @@ -606,7 +607,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { ) } -func TestStatelessExportKind(t *testing.T) { +func TestStatelessAggregationTemporality(t *testing.T) { type testcase struct { name string instrumentKind sdkapi.InstrumentKind @@ -624,8 +625,8 @@ func TestStatelessExportKind(t *testing.T) { runMetricExportTests( t, []otlpmetric.Option{ - otlpmetric.WithMetricExportKindSelector( - metricsdk.StatelessExportKindSelector(), + otlpmetric.WithMetricAggregationTemporalitySelector( + aggregation.StatelessTemporalitySelector(), ), }, testerAResource, diff --git a/exporters/otlp/otlpmetric/internal/metrictransform/metric.go b/exporters/otlp/otlpmetric/internal/metrictransform/metric.go index 8774db45f1e..03f55fad873 100644 --- a/exporters/otlp/otlpmetric/internal/metrictransform/metric.go +++ b/exporters/otlp/otlpmetric/internal/metrictransform/metric.go @@ -72,12 +72,12 @@ func toNanos(t time.Time) uint64 { // InstrumentationLibraryReader transforms all records contained in a checkpoint into // batched OTLP ResourceMetrics. -func InstrumentationLibraryReader(ctx context.Context, exportSelector export.ExportKindSelector, res *resource.Resource, ilmr export.InstrumentationLibraryReader, numWorkers uint) (*metricpb.ResourceMetrics, error) { +func InstrumentationLibraryReader(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, res *resource.Resource, ilmr export.InstrumentationLibraryReader, numWorkers uint) (*metricpb.ResourceMetrics, error) { var ilms []*metricpb.InstrumentationLibraryMetrics err := ilmr.ForEach(func(lib instrumentation.Library, mr export.Reader) error { - records, errc := source(ctx, exportSelector, mr) + records, errc := source(ctx, temporalitySelector, mr) // Start a fixed number of goroutines to transform records. transformed := make(chan result) @@ -86,7 +86,7 @@ func InstrumentationLibraryReader(ctx context.Context, exportSelector export.Exp for i := uint(0); i < numWorkers; i++ { go func() { defer wg.Done() - transformer(ctx, exportSelector, records, transformed) + transformer(ctx, temporalitySelector, records, transformed) }() } go func() { @@ -134,14 +134,14 @@ func InstrumentationLibraryReader(ctx context.Context, exportSelector export.Exp // source starts a goroutine that sends each one of the Records yielded by // the Reader on the returned chan. Any error encountered will be sent // on the returned error chan after seeding is complete. -func source(ctx context.Context, exportSelector export.ExportKindSelector, mr export.Reader) (<-chan export.Record, <-chan error) { +func source(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, mr export.Reader) (<-chan export.Record, <-chan error) { errc := make(chan error, 1) out := make(chan export.Record) // Seed records into process. go func() { defer close(out) // No select is needed since errc is buffered. - errc <- mr.ForEach(exportSelector, func(r export.Record) error { + errc <- mr.ForEach(temporalitySelector, func(r export.Record) error { select { case <-ctx.Done(): return ErrContextCanceled @@ -155,9 +155,9 @@ func source(ctx context.Context, exportSelector export.ExportKindSelector, mr ex // transformer transforms records read from the passed in chan into // OTLP Metrics which are sent on the out chan. -func transformer(ctx context.Context, exportSelector export.ExportKindSelector, in <-chan export.Record, out chan<- result) { +func transformer(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, in <-chan export.Record, out chan<- result) { for r := range in { - m, err := Record(exportSelector, r) + m, err := Record(temporalitySelector, r) // Propagate errors, but do not send empty results. if err == nil && m == nil { continue @@ -237,7 +237,7 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.Metric, error) { // Record transforms a Record into an OTLP Metric. An ErrIncompatibleAgg // error is returned if the Record Aggregator is not supported. -func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricpb.Metric, error) { +func Record(temporalitySelector aggregation.TemporalitySelector, r export.Record) (*metricpb.Metric, error) { agg := r.Aggregation() switch agg.Kind() { case aggregation.MinMaxSumCountKind: @@ -252,7 +252,7 @@ func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricp if !ok { return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg) } - return histogramPoint(r, exportSelector.ExportKindFor(r.Descriptor(), aggregation.HistogramKind), h) + return histogramPoint(r, temporalitySelector.TemporalityFor(r.Descriptor(), aggregation.HistogramKind), h) case aggregation.SumKind: s, ok := agg.(aggregation.Sum) @@ -263,7 +263,7 @@ func Record(exportSelector export.ExportKindSelector, r export.Record) (*metricp if err != nil { return nil, err } - return sumPoint(r, sum, r.StartTime(), r.EndTime(), exportSelector.ExportKindFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic()) + return sumPoint(r, sum, r.StartTime(), r.EndTime(), temporalitySelector.TemporalityFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic()) case aggregation.LastValueKind: lv, ok := agg.(aggregation.LastValue) @@ -388,17 +388,17 @@ func gaugePoint(record export.Record, num number.Number, start, end time.Time) ( return m, nil } -func exportKindToTemporality(ek export.ExportKind) metricpb.AggregationTemporality { - switch ek { - case export.DeltaExportKind: +func sdkTemporalityToTemporality(temporality aggregation.Temporality) metricpb.AggregationTemporality { + switch temporality { + case aggregation.DeltaTemporality: return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA - case export.CumulativeExportKind: + case aggregation.CumulativeTemporality: return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE } return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED } -func sumPoint(record export.Record, num number.Number, start, end time.Time, ek export.ExportKind, monotonic bool) (*metricpb.Metric, error) { +func sumPoint(record export.Record, num number.Number, start, end time.Time, temporality aggregation.Temporality, monotonic bool) (*metricpb.Metric, error) { desc := record.Descriptor() labels := record.Labels() @@ -413,7 +413,7 @@ func sumPoint(record export.Record, num number.Number, start, end time.Time, ek m.Data = &metricpb.Metric_Sum{ Sum: &metricpb.Sum{ IsMonotonic: monotonic, - AggregationTemporality: exportKindToTemporality(ek), + AggregationTemporality: sdkTemporalityToTemporality(temporality), DataPoints: []*metricpb.NumberDataPoint{ { Value: &metricpb.NumberDataPoint_AsInt{ @@ -430,7 +430,7 @@ func sumPoint(record export.Record, num number.Number, start, end time.Time, ek m.Data = &metricpb.Metric_Sum{ Sum: &metricpb.Sum{ IsMonotonic: monotonic, - AggregationTemporality: exportKindToTemporality(ek), + AggregationTemporality: sdkTemporalityToTemporality(temporality), DataPoints: []*metricpb.NumberDataPoint{ { Value: &metricpb.NumberDataPoint_AsDouble{ @@ -522,7 +522,7 @@ func histogramValues(a aggregation.Histogram) (boundaries []float64, counts []ui } // histogram transforms a Histogram Aggregator into an OTLP Metric. -func histogramPoint(record export.Record, ek export.ExportKind, a aggregation.Histogram) (*metricpb.Metric, error) { +func histogramPoint(record export.Record, temporality aggregation.Temporality, a aggregation.Histogram) (*metricpb.Metric, error) { desc := record.Descriptor() labels := record.Labels() boundaries, counts, err := histogramValues(a) @@ -546,7 +546,7 @@ func histogramPoint(record export.Record, ek export.ExportKind, a aggregation.Hi Unit: string(desc.Unit()), Data: &metricpb.Metric_Histogram{ Histogram: &metricpb.Histogram{ - AggregationTemporality: exportKindToTemporality(ek), + AggregationTemporality: sdkTemporalityToTemporality(temporality), DataPoints: []*metricpb.HistogramDataPoint{ { Sum: sum.CoerceToFloat64(desc.NumberKind()), diff --git a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go index d697ae398fb..c3454659f45 100644 --- a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go +++ b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go @@ -190,7 +190,7 @@ func TestSumIntDataPoints(t *testing.T) { value, err := ckpt.Sum() require.NoError(t, err) - if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true); assert.NoError(t, err) { + if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.CumulativeTemporality, true); assert.NoError(t, err) { assert.Nil(t, m.GetGauge()) assert.Equal(t, &metricpb.Sum{ AggregationTemporality: otelCumulative, @@ -229,7 +229,7 @@ func TestSumFloatDataPoints(t *testing.T) { value, err := ckpt.Sum() require.NoError(t, err) - if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), export.DeltaExportKind, false); assert.NoError(t, err) { + if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.DeltaTemporality, false); assert.NoError(t, err) { assert.Nil(t, m.GetGauge()) assert.Equal(t, &metricpb.Sum{ IsMonotonic: false, @@ -367,7 +367,7 @@ func TestSumErrUnknownValueType(t *testing.T) { value, err := s.Sum() require.NoError(t, err) - _, err = sumPoint(record, value, record.StartTime(), record.EndTime(), export.CumulativeExportKind, true) + _, err = sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.CumulativeTemporality, true) assert.Error(t, err) if !errors.Is(err, ErrUnknownValueType) { t.Errorf("expected ErrUnknownValueType, got %v", err) @@ -451,7 +451,7 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) { kind: kind, agg: agg, } - return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, test, intervalStart, intervalEnd)) + return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, test, intervalStart, intervalEnd)) } mpb, err := makeMpb(aggregation.SumKind, &lastvalue.New(1)[0]) @@ -483,7 +483,7 @@ func TestRecordAggregatorUnexpectedErrors(t *testing.T) { makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) { desc := metrictest.NewDescriptor("things", sdkapi.CounterInstrumentKind, number.Int64Kind) labels := attribute.NewSet() - return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, agg, intervalStart, intervalEnd)) + return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, agg, intervalStart, intervalEnd)) } errEx := fmt.Errorf("timeout") diff --git a/exporters/otlp/otlpmetric/internal/otlpmetrictest/otlptest.go b/exporters/otlp/otlpmetric/internal/otlpmetrictest/otlptest.go index a1328312781..fc9adac15ab 100644 --- a/exporters/otlp/otlpmetric/internal/otlpmetrictest/otlptest.go +++ b/exporters/otlp/otlpmetric/internal/otlpmetrictest/otlptest.go @@ -30,7 +30,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" - exportmetric "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/selector/simple" @@ -40,7 +40,7 @@ import ( // themselves. func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter, mcMetrics Collector) { selector := simple.NewWithInexpensiveDistribution() - proc := processor.NewFactory(selector, exportmetric.StatelessExportKindSelector()) + proc := processor.NewFactory(selector, aggregation.StatelessTemporalitySelector()) cont := controller.New(proc, controller.WithExporter(exp)) require.NoError(t, cont.Start(ctx)) diff --git a/exporters/otlp/otlpmetric/options.go b/exporters/otlp/otlpmetric/options.go index 54ce1d0df79..dab33127be6 100644 --- a/exporters/otlp/otlpmetric/options.go +++ b/exporters/otlp/otlpmetric/options.go @@ -14,7 +14,7 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" -import metricsdk "go.opentelemetry.io/otel/sdk/export/metric" +import "go.opentelemetry.io/otel/sdk/export/metric/aggregation" // Option are setting options passed to an Exporter on creation. type Option interface { @@ -28,15 +28,15 @@ func (fn exporterOptionFunc) apply(cfg *config) { } type config struct { - exportKindSelector metricsdk.ExportKindSelector + temporalitySelector aggregation.TemporalitySelector } -// WithMetricExportKindSelector defines the ExportKindSelector used -// for selecting AggregationTemporality (i.e., Cumulative vs. Delta +// WithMetricAggregationTemporalitySelector defines the aggregation.TemporalitySelector used +// for selecting aggregation.Temporality (i.e., Cumulative vs. Delta // aggregation). If not specified otherwise, exporter will use a -// cumulative export kind selector. -func WithMetricExportKindSelector(selector metricsdk.ExportKindSelector) Option { +// cumulative temporality selector. +func WithMetricAggregationTemporalitySelector(selector aggregation.TemporalitySelector) Option { return exporterOptionFunc(func(cfg *config) { - cfg.exportKindSelector = selector + cfg.temporalitySelector = selector }) } diff --git a/exporters/prometheus/prometheus.go b/exporters/prometheus/prometheus.go index f191aa43d2b..9fc276caafc 100644 --- a/exporters/prometheus/prometheus.go +++ b/exporters/prometheus/prometheus.go @@ -132,9 +132,9 @@ func (e *Exporter) Controller() *controller.Controller { return e.controller } -// ExportKindFor implements ExportKindSelector. -func (e *Exporter) ExportKindFor(desc *sdkapi.Descriptor, kind aggregation.Kind) export.ExportKind { - return export.CumulativeExportKindSelector().ExportKindFor(desc, kind) +// TemporalityFor implements TemporalitySelector. +func (e *Exporter) TemporalityFor(desc *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality { + return aggregation.CumulativeTemporalitySelector().TemporalityFor(desc, kind) } // ServeHTTP implements http.Handler. diff --git a/exporters/prometheus/prometheus_test.go b/exporters/prometheus/prometheus_test.go index f1b217541fb..5efdba33469 100644 --- a/exporters/prometheus/prometheus_test.go +++ b/exporters/prometheus/prometheus_test.go @@ -27,7 +27,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/metric" - export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" @@ -88,7 +88,7 @@ func newPipeline(config prometheus.Config, options ...controller.Option) (*prome selector.NewWithHistogramDistribution( histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries), ), - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), processor.WithMemory(true), ), options..., diff --git a/exporters/stdout/stdoutmetric/metric.go b/exporters/stdout/stdoutmetric/metric.go index 9fd6b28d154..07333c64fb7 100644 --- a/exporters/stdout/stdoutmetric/metric.go +++ b/exporters/stdout/stdoutmetric/metric.go @@ -47,8 +47,8 @@ type line struct { Timestamp *time.Time `json:"Timestamp,omitempty"` } -func (e *metricExporter) ExportKindFor(desc *sdkapi.Descriptor, kind aggregation.Kind) exportmetric.ExportKind { - return exportmetric.StatelessExportKindSelector().ExportKindFor(desc, kind) +func (e *metricExporter) TemporalityFor(desc *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality { + return aggregation.StatelessTemporalitySelector().TemporalityFor(desc, kind) } func (e *metricExporter) Export(_ context.Context, res *resource.Resource, reader exportmetric.InstrumentationLibraryReader) error { diff --git a/exporters/stdout/stdoutmetric/metric_test.go b/exporters/stdout/stdoutmetric/metric_test.go index 85ae1f3fb8e..395df09c02f 100644 --- a/exporters/stdout/stdoutmetric/metric_test.go +++ b/exporters/stdout/stdoutmetric/metric_test.go @@ -30,7 +30,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/processor/processortest" @@ -61,7 +61,7 @@ func newFixtureWithResource(t *testing.T, res *resource.Resource, opts ...stdout t.Fatal("Error building fixture: ", err) } aggSel := processortest.AggregatorSelector() - proc := processor.NewFactory(aggSel, export.StatelessExportKindSelector()) + proc := processor.NewFactory(aggSel, aggregation.StatelessTemporalitySelector()) cont := controller.New(proc, controller.WithExporter(exp), controller.WithResource(res), @@ -87,7 +87,7 @@ func (fix testFixture) Output() string { func TestStdoutTimestamp(t *testing.T) { var buf bytes.Buffer aggSel := processortest.AggregatorSelector() - proc := processor.NewFactory(aggSel, export.CumulativeExportKindSelector()) + proc := processor.NewFactory(aggSel, aggregation.CumulativeTemporalitySelector()) exporter, err := stdoutmetric.New( stdoutmetric.WithWriter(&buf), ) diff --git a/internal/tools/go.sum b/internal/tools/go.sum index 46253fb418f..abd35fb0fd2 100644 --- a/internal/tools/go.sum +++ b/internal/tools/go.sum @@ -401,6 +401,7 @@ github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/itchyny/go-flags v1.5.0 h1:Z5q2ist2sfDjDlExVPBrMqlsEDxDR2h4zuOElB0OEYI= github.com/itchyny/go-flags v1.5.0/go.mod h1:lenkYuCobuxLBAd/HGFE4LRoW8D3B6iXRQfWYJ+MNbA= github.com/itchyny/gojq v0.12.5 h1:6SJ1BQ1VAwJAlIvLSIZmqHP/RUEq3qfVWvsRxrqhsD0= github.com/itchyny/gojq v0.12.5/go.mod h1:3e1hZXv+Kwvdp6V9HXpVrvddiHVApi5EDZwS+zLFeiE= diff --git a/sdk/export/metric/aggregation/temporality.go b/sdk/export/metric/aggregation/temporality.go new file mode 100644 index 00000000000..4a4a733aa28 --- /dev/null +++ b/sdk/export/metric/aggregation/temporality.go @@ -0,0 +1,117 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate stringer -type=Temporality + +package aggregation // import "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + +import ( + "go.opentelemetry.io/otel/metric/sdkapi" +) + +// Temporality indicates the temporal aggregation exported by an exporter. +// These bits may be OR-d together when multiple exporters are in use. +type Temporality uint8 + +const ( + // CumulativeTemporality indicates that an Exporter expects a + // Cumulative Aggregation. + CumulativeTemporality Temporality = 1 + + // DeltaTemporality indicates that an Exporter expects a + // Delta Aggregation. + DeltaTemporality Temporality = 2 +) + +// Includes returns if t includes support for other temporality. +func (t Temporality) Includes(other Temporality) bool { + return t&other != 0 +} + +// MemoryRequired returns whether an exporter of this temporality requires +// memory to export correctly. +func (t Temporality) MemoryRequired(mkind sdkapi.InstrumentKind) bool { + switch mkind { + case sdkapi.HistogramInstrumentKind, sdkapi.GaugeObserverInstrumentKind, + sdkapi.CounterInstrumentKind, sdkapi.UpDownCounterInstrumentKind: + // Delta-oriented instruments: + return t.Includes(CumulativeTemporality) + + case sdkapi.CounterObserverInstrumentKind, sdkapi.UpDownCounterObserverInstrumentKind: + // Cumulative-oriented instruments: + return t.Includes(DeltaTemporality) + } + // Something unexpected is happening--we could panic. This + // will become an error when the exporter tries to access a + // checkpoint, presumably, so let it be. + return false +} + +type ( + constantTemporalitySelector Temporality + statelessTemporalitySelector struct{} +) + +var ( + _ TemporalitySelector = constantTemporalitySelector(0) + _ TemporalitySelector = statelessTemporalitySelector{} +) + +// ConstantTemporalitySelector returns an TemporalitySelector that returns +// a constant Temporality. +func ConstantTemporalitySelector(t Temporality) TemporalitySelector { + return constantTemporalitySelector(t) +} + +// CumulativeTemporalitySelector returns an TemporalitySelector that +// always returns CumulativeTemporality. +func CumulativeTemporalitySelector() TemporalitySelector { + return ConstantTemporalitySelector(CumulativeTemporality) +} + +// DeltaTemporalitySelector returns an TemporalitySelector that +// always returns DeltaTemporality. +func DeltaTemporalitySelector() TemporalitySelector { + return ConstantTemporalitySelector(DeltaTemporality) +} + +// StatelessTemporalitySelector returns an TemporalitySelector that +// always returns the Temporality that avoids long-term memory +// requirements. +func StatelessTemporalitySelector() TemporalitySelector { + return statelessTemporalitySelector{} +} + +// TemporalityFor implements TemporalitySelector. +func (c constantTemporalitySelector) TemporalityFor(_ *sdkapi.Descriptor, _ Kind) Temporality { + return Temporality(c) +} + +// TemporalityFor implements TemporalitySelector. +func (s statelessTemporalitySelector) TemporalityFor(desc *sdkapi.Descriptor, kind Kind) Temporality { + if kind == SumKind && desc.InstrumentKind().PrecomputedSum() { + return CumulativeTemporality + } + return DeltaTemporality +} + +// TemporalitySelector is a sub-interface of Exporter used to indicate +// whether the Processor should compute Delta or Cumulative +// Aggregations. +type TemporalitySelector interface { + // TemporalityFor should return the correct Temporality that + // should be used when exporting data for the given metric + // instrument and Aggregator kind. + TemporalityFor(descriptor *sdkapi.Descriptor, aggregationKind Kind) Temporality +} diff --git a/sdk/export/metric/aggregation/temporality_string.go b/sdk/export/metric/aggregation/temporality_string.go new file mode 100644 index 00000000000..c37ddb380d5 --- /dev/null +++ b/sdk/export/metric/aggregation/temporality_string.go @@ -0,0 +1,25 @@ +// Code generated by "stringer -type=Temporality"; DO NOT EDIT. + +package aggregation // import "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[CumulativeTemporality-1] + _ = x[DeltaTemporality-2] +} + +const _Temporality_name = "CumulativeTemporalityDeltaTemporality" + +var _Temporality_index = [...]uint8{0, 21, 37} + +func (i Temporality) String() string { + i -= 1 + if i >= Temporality(len(_Temporality_index)-1) { + return "Temporality(" + strconv.FormatInt(int64(i+1), 10) + ")" + } + return _Temporality_name[_Temporality_index[i]:_Temporality_index[i+1]] +} diff --git a/sdk/export/metric/aggregation/temporality_test.go b/sdk/export/metric/aggregation/temporality_test.go new file mode 100644 index 00000000000..d5d73e9d049 --- /dev/null +++ b/sdk/export/metric/aggregation/temporality_test.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregation + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/metric/metrictest" + "go.opentelemetry.io/otel/metric/number" + "go.opentelemetry.io/otel/metric/sdkapi" +) + +func TestTemporalityIncludes(t *testing.T) { + require.True(t, CumulativeTemporality.Includes(CumulativeTemporality)) + require.True(t, DeltaTemporality.Includes(CumulativeTemporality|DeltaTemporality)) +} + +var deltaMemoryTemporalties = []sdkapi.InstrumentKind{ + sdkapi.CounterObserverInstrumentKind, + sdkapi.UpDownCounterObserverInstrumentKind, +} + +var cumulativeMemoryTemporalties = []sdkapi.InstrumentKind{ + sdkapi.HistogramInstrumentKind, + sdkapi.GaugeObserverInstrumentKind, + sdkapi.CounterInstrumentKind, + sdkapi.UpDownCounterInstrumentKind, +} + +func TestTemporalityMemoryRequired(t *testing.T) { + for _, kind := range deltaMemoryTemporalties { + require.True(t, DeltaTemporality.MemoryRequired(kind)) + require.False(t, CumulativeTemporality.MemoryRequired(kind)) + } + + for _, kind := range cumulativeMemoryTemporalties { + require.True(t, CumulativeTemporality.MemoryRequired(kind)) + require.False(t, DeltaTemporality.MemoryRequired(kind)) + } +} + +func TestTemporalitySelectors(t *testing.T) { + cAggTemp := CumulativeTemporalitySelector() + dAggTemp := DeltaTemporalitySelector() + sAggTemp := StatelessTemporalitySelector() + + for _, ikind := range append(deltaMemoryTemporalties, cumulativeMemoryTemporalties...) { + desc := metrictest.NewDescriptor("instrument", ikind, number.Int64Kind) + + var akind Kind + if ikind.Adding() { + akind = SumKind + } else { + akind = HistogramKind + } + require.Equal(t, CumulativeTemporality, cAggTemp.TemporalityFor(&desc, akind)) + require.Equal(t, DeltaTemporality, dAggTemp.TemporalityFor(&desc, akind)) + require.False(t, sAggTemp.TemporalityFor(&desc, akind).MemoryRequired(ikind)) + } +} diff --git a/sdk/export/metric/exportkind_string.go b/sdk/export/metric/exportkind_string.go deleted file mode 100644 index 3a04abdd575..00000000000 --- a/sdk/export/metric/exportkind_string.go +++ /dev/null @@ -1,25 +0,0 @@ -// Code generated by "stringer -type=ExportKind"; DO NOT EDIT. - -package metric // import "go.opentelemetry.io/otel/sdk/export/metric" - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[CumulativeExportKind-1] - _ = x[DeltaExportKind-2] -} - -const _ExportKind_name = "CumulativeExportKindDeltaExportKind" - -var _ExportKind_index = [...]uint8{0, 20, 35} - -func (i ExportKind) String() string { - i -= 1 - if i < 0 || i >= ExportKind(len(_ExportKind_index)-1) { - return "ExportKind(" + strconv.FormatInt(int64(i+1), 10) + ")" - } - return _ExportKind_name[_ExportKind_index[i]:_ExportKind_index[i+1]] -} diff --git a/sdk/export/metric/exportkind_test.go b/sdk/export/metric/exportkind_test.go deleted file mode 100644 index 2d8c1602841..00000000000 --- a/sdk/export/metric/exportkind_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/otel/metric/metrictest" - "go.opentelemetry.io/otel/metric/number" - "go.opentelemetry.io/otel/metric/sdkapi" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" -) - -func TestExportKindIncludes(t *testing.T) { - require.True(t, CumulativeExportKind.Includes(CumulativeExportKind)) - require.True(t, DeltaExportKind.Includes(CumulativeExportKind|DeltaExportKind)) -} - -var deltaMemoryKinds = []sdkapi.InstrumentKind{ - sdkapi.CounterObserverInstrumentKind, - sdkapi.UpDownCounterObserverInstrumentKind, -} - -var cumulativeMemoryKinds = []sdkapi.InstrumentKind{ - sdkapi.HistogramInstrumentKind, - sdkapi.GaugeObserverInstrumentKind, - sdkapi.CounterInstrumentKind, - sdkapi.UpDownCounterInstrumentKind, -} - -func TestExportKindMemoryRequired(t *testing.T) { - for _, kind := range deltaMemoryKinds { - require.True(t, DeltaExportKind.MemoryRequired(kind)) - require.False(t, CumulativeExportKind.MemoryRequired(kind)) - } - - for _, kind := range cumulativeMemoryKinds { - require.True(t, CumulativeExportKind.MemoryRequired(kind)) - require.False(t, DeltaExportKind.MemoryRequired(kind)) - } -} - -func TestExportKindSelectors(t *testing.T) { - ceks := CumulativeExportKindSelector() - deks := DeltaExportKindSelector() - seks := StatelessExportKindSelector() - - for _, ikind := range append(deltaMemoryKinds, cumulativeMemoryKinds...) { - desc := metrictest.NewDescriptor("instrument", ikind, number.Int64Kind) - - var akind aggregation.Kind - if ikind.Adding() { - akind = aggregation.SumKind - } else { - akind = aggregation.HistogramKind - } - require.Equal(t, CumulativeExportKind, ceks.ExportKindFor(&desc, akind)) - require.Equal(t, DeltaExportKind, deks.ExportKindFor(&desc, akind)) - require.False(t, seks.ExportKindFor(&desc, akind).MemoryRequired(ikind)) - } -} diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index a9a77415f5b..f077f74013f 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate stringer -type=ExportKind - package metric // import "go.opentelemetry.io/otel/sdk/export/metric" import ( @@ -219,20 +217,10 @@ type Exporter interface { // Processor that just completed collection. Export(ctx context.Context, resource *resource.Resource, reader InstrumentationLibraryReader) error - // ExportKindSelector is an interface used by the Processor + // TemporalitySelector is an interface used by the Processor // in deciding whether to compute Delta or Cumulative // Aggregations when passing Records to this Exporter. - ExportKindSelector -} - -// ExportKindSelector is a sub-interface of Exporter used to indicate -// whether the Processor should compute Delta or Cumulative -// Aggregations. -type ExportKindSelector interface { - // ExportKindFor should return the correct ExportKind that - // should be used when exporting data for the given metric - // instrument and Aggregator kind. - ExportKindFor(descriptor *sdkapi.Descriptor, aggregatorKind aggregation.Kind) ExportKind + aggregation.TemporalitySelector } // InstrumentationLibraryReader is an interface for exporters to iterate @@ -254,7 +242,7 @@ type Reader interface { // period. Each aggregated checkpoint returned by the // function parameter may return an error. // - // The ExportKindSelector argument is used to determine + // The TemporalitySelector argument is used to determine // whether the Record is computed using Delta or Cumulative // aggregation. // @@ -262,7 +250,7 @@ type Reader interface { // expected from the Meter implementation. Any other kind // of error will immediately halt ForEach and return // the error to the caller. - ForEach(kindSelector ExportKindSelector, recordFunc func(Record) error) error + ForEach(tempSelector aggregation.TemporalitySelector, recordFunc func(Record) error) error // Locker supports locking the checkpoint set. Collection // into the checkpoint set cannot take place (in case of a @@ -364,90 +352,3 @@ func (r Record) StartTime() time.Time { func (r Record) EndTime() time.Time { return r.end } - -// ExportKind indicates the kind of data exported by an exporter. -// These bits may be OR-d together when multiple exporters are in use. -type ExportKind int - -const ( - // CumulativeExportKind indicates that an Exporter expects a - // Cumulative Aggregation. - CumulativeExportKind ExportKind = 1 - - // DeltaExportKind indicates that an Exporter expects a - // Delta Aggregation. - DeltaExportKind ExportKind = 2 -) - -// Includes tests whether `kind` includes a specific kind of -// exporter. -func (kind ExportKind) Includes(has ExportKind) bool { - return kind&has != 0 -} - -// MemoryRequired returns whether an exporter of this kind requires -// memory to export correctly. -func (kind ExportKind) MemoryRequired(mkind sdkapi.InstrumentKind) bool { - switch mkind { - case sdkapi.HistogramInstrumentKind, sdkapi.GaugeObserverInstrumentKind, - sdkapi.CounterInstrumentKind, sdkapi.UpDownCounterInstrumentKind: - // Delta-oriented instruments: - return kind.Includes(CumulativeExportKind) - - case sdkapi.CounterObserverInstrumentKind, sdkapi.UpDownCounterObserverInstrumentKind: - // Cumulative-oriented instruments: - return kind.Includes(DeltaExportKind) - } - // Something unexpected is happening--we could panic. This - // will become an error when the exporter tries to access a - // checkpoint, presumably, so let it be. - return false -} - -type ( - constantExportKindSelector ExportKind - statelessExportKindSelector struct{} -) - -var ( - _ ExportKindSelector = constantExportKindSelector(0) - _ ExportKindSelector = statelessExportKindSelector{} -) - -// ConstantExportKindSelector returns an ExportKindSelector that returns -// a constant ExportKind, one that is either always cumulative or always delta. -func ConstantExportKindSelector(kind ExportKind) ExportKindSelector { - return constantExportKindSelector(kind) -} - -// CumulativeExportKindSelector returns an ExportKindSelector that -// always returns CumulativeExportKind. -func CumulativeExportKindSelector() ExportKindSelector { - return ConstantExportKindSelector(CumulativeExportKind) -} - -// DeltaExportKindSelector returns an ExportKindSelector that -// always returns DeltaExportKind. -func DeltaExportKindSelector() ExportKindSelector { - return ConstantExportKindSelector(DeltaExportKind) -} - -// StatelessExportKindSelector returns an ExportKindSelector that -// always returns the ExportKind that avoids long-term memory -// requirements. -func StatelessExportKindSelector() ExportKindSelector { - return statelessExportKindSelector{} -} - -// ExportKindFor implements ExportKindSelector. -func (c constantExportKindSelector) ExportKindFor(_ *sdkapi.Descriptor, _ aggregation.Kind) ExportKind { - return ExportKind(c) -} - -// ExportKindFor implements ExportKindSelector. -func (s statelessExportKindSelector) ExportKindFor(desc *sdkapi.Descriptor, kind aggregation.Kind) ExportKind { - if kind == aggregation.SumKind && desc.InstrumentKind().PrecomputedSum() { - return CumulativeExportKind - } - return DeltaExportKind -} diff --git a/sdk/metric/controller/basic/controller_test.go b/sdk/metric/controller/basic/controller_test.go index dd90e7c0b7c..870e65f2c0c 100644 --- a/sdk/metric/controller/basic/controller_test.go +++ b/sdk/metric/controller/basic/controller_test.go @@ -45,7 +45,7 @@ func getMap(t *testing.T, cont *controller.Controller) map[string]float64 { require.NoError(t, cont.ForEach( func(_ instrumentation.Library, reader export.Reader) error { return reader.ForEach( - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), func(record export.Record) error { return out.AddRecord(record) }, @@ -115,7 +115,7 @@ func TestControllerUsesResource(t *testing.T) { } for _, c := range cases { t.Run(fmt.Sprintf("case-%s", c.name), func(t *testing.T) { - sel := export.CumulativeExportKindSelector() + sel := aggregation.CumulativeTemporalitySelector() exp := processortest.New(sel, attribute.DefaultEncoder()) cont := controller.New( processor.NewFactory( @@ -145,7 +145,7 @@ func TestStartNoExporter(t *testing.T) { cont := controller.New( processor.NewFactory( processortest.AggregatorSelector(), - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), ), controller.WithCollectPeriod(time.Second), controller.WithResource(resource.Empty()), @@ -214,7 +214,7 @@ func TestObserverCanceled(t *testing.T) { cont := controller.New( processor.NewFactory( processortest.AggregatorSelector(), - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), ), controller.WithCollectPeriod(0), controller.WithCollectTimeout(time.Millisecond), @@ -246,7 +246,7 @@ func TestObserverContext(t *testing.T) { cont := controller.New( processor.NewFactory( processortest.AggregatorSelector(), - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), ), controller.WithCollectTimeout(0), controller.WithResource(resource.Empty()), @@ -278,7 +278,7 @@ type blockingExporter struct { func newBlockingExporter() *blockingExporter { return &blockingExporter{ exporter: processortest.New( - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), attribute.DefaultEncoder(), ), } @@ -296,11 +296,8 @@ func (b *blockingExporter) Export(ctx context.Context, res *resource.Resource, o return err } -func (*blockingExporter) ExportKindFor( - *sdkapi.Descriptor, - aggregation.Kind, -) export.ExportKind { - return export.CumulativeExportKind +func (*blockingExporter) TemporalityFor(*sdkapi.Descriptor, aggregation.Kind) aggregation.Temporality { + return aggregation.CumulativeTemporality } func TestExportTimeout(t *testing.T) { @@ -308,7 +305,7 @@ func TestExportTimeout(t *testing.T) { cont := controller.New( processor.NewFactory( processortest.AggregatorSelector(), - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), ), controller.WithCollectPeriod(time.Second), controller.WithPushTimeout(time.Millisecond), @@ -357,7 +354,7 @@ func TestExportTimeout(t *testing.T) { func TestCollectAfterStopThenStartAgain(t *testing.T) { exp := processortest.New( - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), attribute.DefaultEncoder(), ) cont := controller.New( @@ -436,7 +433,7 @@ func TestCollectAfterStopThenStartAgain(t *testing.T) { func TestRegistryFunction(t *testing.T) { exp := processortest.New( - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), attribute.DefaultEncoder(), ) cont := controller.New( diff --git a/sdk/metric/controller/basic/pull_test.go b/sdk/metric/controller/basic/pull_test.go index 04c25c23571..6e87c6f6a97 100644 --- a/sdk/metric/controller/basic/pull_test.go +++ b/sdk/metric/controller/basic/pull_test.go @@ -24,7 +24,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" "go.opentelemetry.io/otel/sdk/metric/controller/controllertest" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" @@ -36,7 +36,7 @@ func TestPullNoCollect(t *testing.T) { puller := controller.New( processor.NewFactory( processortest.AggregatorSelector(), - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), processor.WithMemory(true), ), controller.WithCollectPeriod(0), @@ -51,7 +51,7 @@ func TestPullNoCollect(t *testing.T) { require.NoError(t, puller.Collect(ctx)) records := processortest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord)) + require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 10, @@ -61,7 +61,7 @@ func TestPullNoCollect(t *testing.T) { require.NoError(t, puller.Collect(ctx)) records = processortest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord)) + require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 20, @@ -72,7 +72,7 @@ func TestPullWithCollect(t *testing.T) { puller := controller.New( processor.NewFactory( processortest.AggregatorSelector(), - export.CumulativeExportKindSelector(), + aggregation.CumulativeTemporalitySelector(), processor.WithMemory(true), ), controller.WithCollectPeriod(time.Second), @@ -89,7 +89,7 @@ func TestPullWithCollect(t *testing.T) { require.NoError(t, puller.Collect(ctx)) records := processortest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord)) + require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 10, @@ -100,7 +100,7 @@ func TestPullWithCollect(t *testing.T) { // Cached value! require.NoError(t, puller.Collect(ctx)) records = processortest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord)) + require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 10, @@ -112,7 +112,7 @@ func TestPullWithCollect(t *testing.T) { // Re-computed value! require.NoError(t, puller.Collect(ctx)) records = processortest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, controllertest.ReadAll(puller, export.CumulativeExportKindSelector(), records.AddInstrumentationLibraryRecord)) + require.NoError(t, controllertest.ReadAll(puller, aggregation.CumulativeTemporalitySelector(), records.AddInstrumentationLibraryRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 20, diff --git a/sdk/metric/controller/basic/push_test.go b/sdk/metric/controller/basic/push_test.go index 0b1b5474f3b..775754774bf 100644 --- a/sdk/metric/controller/basic/push_test.go +++ b/sdk/metric/controller/basic/push_test.go @@ -67,7 +67,7 @@ func init() { func newExporter() *processortest.Exporter { return processortest.New( - export.StatelessExportKindSelector(), + aggregation.StatelessTemporalitySelector(), attribute.DefaultEncoder(), ) } diff --git a/sdk/metric/controller/controllertest/test.go b/sdk/metric/controller/controllertest/test.go index d4b8d3a3299..8676129ebe5 100644 --- a/sdk/metric/controller/controllertest/test.go +++ b/sdk/metric/controller/controllertest/test.go @@ -20,6 +20,7 @@ import ( "github.com/benbjohnson/clock" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/instrumentation" controllerTime "go.opentelemetry.io/otel/sdk/metric/controller/time" ) @@ -64,7 +65,7 @@ func (t MockTicker) C() <-chan time.Time { // metric). func ReadAll( reader export.InstrumentationLibraryReader, - kind export.ExportKindSelector, + kind aggregation.TemporalitySelector, apply func(instrumentation.Library, export.Record) error, ) error { return reader.ForEach(func(library instrumentation.Library, reader export.Reader) error { diff --git a/sdk/metric/processor/basic/basic.go b/sdk/metric/processor/basic/basic.go index c01509faf04..7e2fd26320a 100644 --- a/sdk/metric/processor/basic/basic.go +++ b/sdk/metric/processor/basic/basic.go @@ -28,7 +28,7 @@ import ( type ( Processor struct { - export.ExportKindSelector + aggregation.TemporalitySelector export.AggregatorSelector state @@ -118,32 +118,32 @@ var _ export.Reader = &state{} // ErrInconsistentState is returned when the sequence of collection's starts and finishes are incorrectly balanced. var ErrInconsistentState = fmt.Errorf("inconsistent processor state") -// ErrInvalidExportKind is returned for unknown metric.ExportKind. -var ErrInvalidExportKind = fmt.Errorf("invalid export kind") +// ErrInvalidTemporality is returned for unknown metric.Temporality. +var ErrInvalidTemporality = fmt.Errorf("invalid aggregation temporality") // New returns a basic Processor that is also a Checkpointer using the provided -// AggregatorSelector to select Aggregators. The ExportKindSelector +// AggregatorSelector to select Aggregators. The TemporalitySelector // is consulted to determine the kind(s) of exporter that will consume // data, so that this Processor can prepare to compute Delta or // Cumulative Aggregations as needed. -func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector, opts ...Option) *Processor { - return NewFactory(aselector, eselector, opts...).NewCheckpointer().(*Processor) +func New(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) *Processor { + return NewFactory(aselector, tselector, opts...).NewCheckpointer().(*Processor) } type factory struct { aselector export.AggregatorSelector - eselector export.ExportKindSelector + tselector aggregation.TemporalitySelector config config } -func NewFactory(aselector export.AggregatorSelector, eselector export.ExportKindSelector, opts ...Option) export.CheckpointerFactory { +func NewFactory(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) export.CheckpointerFactory { var config config for _, opt := range opts { opt.applyProcessor(&config) } return factory{ aselector: aselector, - eselector: eselector, + tselector: tselector, config: config, } } @@ -153,8 +153,8 @@ var _ export.CheckpointerFactory = factory{} func (f factory) NewCheckpointer() export.Checkpointer { now := time.Now() p := &Processor{ - AggregatorSelector: f.aselector, - ExportKindSelector: f.eselector, + AggregatorSelector: f.aselector, + TemporalitySelector: f.tselector, state: state{ values: map[stateKey]*stateValue{}, processStart: now, @@ -181,7 +181,7 @@ func (b *Processor) Process(accum export.Accumulation) error { // Check if there is an existing value. value, ok := b.state.values[key] if !ok { - stateful := b.ExportKindFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.InstrumentKind()) + stateful := b.TemporalityFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.InstrumentKind()) newValue := &stateValue{ labels: accum.Labels(), @@ -227,7 +227,7 @@ func (b *Processor) Process(accum export.Accumulation) error { // instrument reports a PrecomputedSum to a DeltaExporter or // the reverse, a non-PrecomputedSum instrument with a // CumulativeExporter. This logic is encapsulated in - // ExportKind.MemoryRequired(InstrumentKind). + // Temporality.MemoryRequired(InstrumentKind). // // Case (b) occurs when the variable `sameCollection` is true, // indicating that the stateKey for Accumulation has already @@ -340,7 +340,7 @@ func (b *Processor) FinishCollection() error { // ForEach iterates through the Reader, passing an // export.Record with the appropriate Cumulative or Delta aggregation // to an exporter. -func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error { +func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.Record) error) error { if b.startedCollection != b.finishedCollection { return ErrInconsistentState } @@ -356,9 +356,9 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record continue } - ekind := exporter.ExportKindFor(key.descriptor, value.current.Aggregation().Kind()) - switch ekind { - case export.CumulativeExportKind: + aggTemp := exporter.TemporalityFor(key.descriptor, value.current.Aggregation().Kind()) + switch aggTemp { + case aggregation.CumulativeTemporality: // If stateful, the sum has been computed. If stateless, the // input was already cumulative. Either way, use the checkpointed // value: @@ -369,7 +369,7 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record } start = b.processStart - case export.DeltaExportKind: + case aggregation.DeltaTemporality: // Precomputed sums are a special case. if mkind.PrecomputedSum() { agg = value.delta.Aggregation() @@ -379,7 +379,7 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record start = b.intervalStart default: - return fmt.Errorf("%v: %w", ekind, ErrInvalidExportKind) + return fmt.Errorf("%v: %w", aggTemp, ErrInvalidTemporality) } if err := f(export.NewRecord( diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index 131d73b5c7d..d4d90e95050 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -47,7 +47,7 @@ func requireNotAfter(t *testing.T, t1, t2 time.Time) { // TestProcessor tests all the non-error paths in this package. func TestProcessor(t *testing.T) { type exportCase struct { - kind export.ExportKind + kind aggregation.Temporality } type instrumentCase struct { kind sdkapi.InstrumentKind @@ -60,8 +60,8 @@ func TestProcessor(t *testing.T) { } for _, tc := range []exportCase{ - {kind: export.CumulativeExportKind}, - {kind: export.DeltaExportKind}, + {kind: aggregation.CumulativeTemporality}, + {kind: aggregation.DeltaTemporality}, } { t.Run(tc.kind.String(), func(t *testing.T) { for _, ic := range []instrumentCase{ @@ -121,7 +121,7 @@ func updateFor(t *testing.T, desc *sdkapi.Descriptor, selector export.Aggregator func testProcessor( t *testing.T, - ekind export.ExportKind, + aggTemp aggregation.Temporality, mkind sdkapi.InstrumentKind, nkind number.Kind, akind aggregation.Kind, @@ -134,7 +134,7 @@ func testProcessor( labs2 := []attribute.KeyValue{attribute.String("L2", "V")} testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) { - processor := basic.New(selector, export.ConstantExportKindSelector(ekind), basic.WithMemory(hasMemory)) + processor := basic.New(selector, aggregation.ConstantTemporalitySelector(aggTemp), basic.WithMemory(hasMemory)) instSuffix := fmt.Sprint(".", strings.ToLower(akind.String())) @@ -166,7 +166,7 @@ func testProcessor( _, canSub := subr.(export.Subtractor) // Allow unsupported subraction case only when it is called for. - require.True(t, mkind.PrecomputedSum() && ekind == export.DeltaExportKind && !canSub) + require.True(t, mkind.PrecomputedSum() && aggTemp == aggregation.DeltaTemporality && !canSub) return } else if err != nil { t.Fatal("unexpected FinishCollection error: ", err) @@ -190,7 +190,7 @@ func testProcessor( // Test the final checkpoint state. records1 := processorTest.NewOutput(attribute.DefaultEncoder()) - err = reader.ForEach(export.ConstantExportKindSelector(ekind), records1.AddRecord) + err = reader.ForEach(aggregation.ConstantTemporalitySelector(aggTemp), records1.AddRecord) // Test for an allowed error: if err != nil && err != aggregation.ErrNoSubtraction { @@ -203,7 +203,7 @@ func testProcessor( // number of Accumulators, unless LastValue aggregation. // If a precomputed sum, we expect cumulative inputs. if mkind.PrecomputedSum() { - if ekind == export.DeltaExportKind && akind != aggregation.LastValueKind { + if aggTemp == aggregation.DeltaTemporality && akind != aggregation.LastValueKind { multiplier = int64(nAccum) } else if akind == aggregation.LastValueKind { multiplier = cumulativeMultiplier @@ -211,7 +211,7 @@ func testProcessor( multiplier = cumulativeMultiplier * int64(nAccum) } } else { - if ekind == export.CumulativeExportKind && akind != aggregation.LastValueKind { + if aggTemp == aggregation.CumulativeTemporality && akind != aggregation.LastValueKind { multiplier = cumulativeMultiplier * int64(nAccum) } else if akind == aggregation.LastValueKind { multiplier = 1 @@ -223,7 +223,7 @@ func testProcessor( // Synchronous accumulate results from multiple accumulators, // use that number as the baseline multiplier. multiplier = int64(nAccum) - if ekind == export.CumulativeExportKind { + if aggTemp == aggregation.CumulativeTemporality { // If a cumulative exporter, include prior checkpoints. multiplier *= cumulativeMultiplier } @@ -265,8 +265,8 @@ func testProcessor( type bogusExporter struct{} -func (bogusExporter) ExportKindFor(*sdkapi.Descriptor, aggregation.Kind) export.ExportKind { - return 1000000 +func (bogusExporter) TemporalityFor(*sdkapi.Descriptor, aggregation.Kind) aggregation.Temporality { + return 100 } func (bogusExporter) Export(context.Context, export.Reader) error { @@ -275,39 +275,39 @@ func (bogusExporter) Export(context.Context, export.Reader) error { func TestBasicInconsistent(t *testing.T) { // Test double-start - b := basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) + b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) b.StartCollection() b.StartCollection() require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) // Test finish without start - b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) + b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) require.Equal(t, basic.ErrInconsistentState, b.FinishCollection()) // Test no finish - b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) + b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) b.StartCollection() require.Equal( t, basic.ErrInconsistentState, b.ForEach( - export.StatelessExportKindSelector(), + aggregation.StatelessTemporalitySelector(), func(export.Record) error { return nil }, ), ) // Test no start - b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) + b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) desc := metrictest.NewDescriptor("inst", sdkapi.CounterInstrumentKind, number.Int64Kind) accum := export.NewAccumulation(&desc, attribute.EmptySet(), aggregatortest.NoopAggregator{}) require.Equal(t, basic.ErrInconsistentState, b.Process(accum)) // Test invalid kind: - b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) + b = basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) b.StartCollection() require.NoError(t, b.Process(accum)) require.NoError(t, b.FinishCollection()) @@ -316,14 +316,14 @@ func TestBasicInconsistent(t *testing.T) { bogusExporter{}, func(export.Record) error { return nil }, ) - require.True(t, errors.Is(err, basic.ErrInvalidExportKind)) + require.True(t, errors.Is(err, basic.ErrInvalidTemporality)) } func TestBasicTimestamps(t *testing.T) { beforeNew := time.Now() time.Sleep(time.Nanosecond) - b := basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) + b := basic.New(processorTest.AggregatorSelector(), aggregation.StatelessTemporalitySelector()) time.Sleep(time.Nanosecond) afterNew := time.Now() @@ -336,7 +336,7 @@ func TestBasicTimestamps(t *testing.T) { var start1, end1 time.Time - require.NoError(t, b.ForEach(export.StatelessExportKindSelector(), func(rec export.Record) error { + require.NoError(t, b.ForEach(aggregation.StatelessTemporalitySelector(), func(rec export.Record) error { start1 = rec.StartTime() end1 = rec.EndTime() return nil @@ -353,7 +353,7 @@ func TestBasicTimestamps(t *testing.T) { var start2, end2 time.Time - require.NoError(t, b.ForEach(export.StatelessExportKindSelector(), func(rec export.Record) error { + require.NoError(t, b.ForEach(aggregation.StatelessTemporalitySelector(), func(rec export.Record) error { start2 = rec.StartTime() end2 = rec.EndTime() return nil @@ -370,12 +370,12 @@ func TestBasicTimestamps(t *testing.T) { } func TestStatefulNoMemoryCumulative(t *testing.T) { - ekindSel := export.CumulativeExportKindSelector() + aggTempSel := aggregation.CumulativeTemporalitySelector() desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterInstrumentKind, number.Int64Kind) selector := processorTest.AggregatorSelector() - processor := basic.New(selector, ekindSel, basic.WithMemory(false)) + processor := basic.New(selector, aggTempSel, basic.WithMemory(false)) reader := processor.Reader() for i := 1; i < 3; i++ { @@ -385,7 +385,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { // Verify zero elements records := processorTest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, reader.ForEach(ekindSel, records.AddRecord)) + require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) require.EqualValues(t, map[string]float64{}, records.Map()) // Add 10 @@ -395,7 +395,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { // Verify one element records = processorTest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, reader.ForEach(ekindSel, records.AddRecord)) + require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) require.EqualValues(t, map[string]float64{ "inst.sum/A=B/": float64(i * 10), }, records.Map()) @@ -403,12 +403,12 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { } func TestStatefulNoMemoryDelta(t *testing.T) { - ekindSel := export.DeltaExportKindSelector() + aggTempSel := aggregation.DeltaTemporalitySelector() desc := metrictest.NewDescriptor("inst.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind) selector := processorTest.AggregatorSelector() - processor := basic.New(selector, ekindSel, basic.WithMemory(false)) + processor := basic.New(selector, aggTempSel, basic.WithMemory(false)) reader := processor.Reader() for i := 1; i < 3; i++ { @@ -418,7 +418,7 @@ func TestStatefulNoMemoryDelta(t *testing.T) { // Verify zero elements records := processorTest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, reader.ForEach(ekindSel, records.AddRecord)) + require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) require.EqualValues(t, map[string]float64{}, records.Map()) // Add 10 @@ -428,7 +428,7 @@ func TestStatefulNoMemoryDelta(t *testing.T) { // Verify one element records = processorTest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, reader.ForEach(ekindSel, records.AddRecord)) + require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) require.EqualValues(t, map[string]float64{ "inst.sum/A=B/": 10, }, records.Map()) @@ -436,15 +436,15 @@ func TestStatefulNoMemoryDelta(t *testing.T) { } func TestMultiObserverSum(t *testing.T) { - for _, ekindSel := range []export.ExportKindSelector{ - export.CumulativeExportKindSelector(), - export.DeltaExportKindSelector(), + for _, aggTempSel := range []aggregation.TemporalitySelector{ + aggregation.CumulativeTemporalitySelector(), + aggregation.DeltaTemporalitySelector(), } { desc := metrictest.NewDescriptor("observe.sum", sdkapi.CounterObserverInstrumentKind, number.Int64Kind) selector := processorTest.AggregatorSelector() - processor := basic.New(selector, ekindSel, basic.WithMemory(false)) + processor := basic.New(selector, aggTempSel, basic.WithMemory(false)) reader := processor.Reader() for i := 1; i < 3; i++ { @@ -457,13 +457,13 @@ func TestMultiObserverSum(t *testing.T) { // Multiplier is 1 for deltas, otherwise i. multiplier := i - if ekindSel.ExportKindFor(&desc, aggregation.SumKind) == export.DeltaExportKind { + if aggTempSel.TemporalityFor(&desc, aggregation.SumKind) == aggregation.DeltaTemporality { multiplier = 1 } // Verify one element records := processorTest.NewOutput(attribute.DefaultEncoder()) - require.NoError(t, reader.ForEach(ekindSel, records.AddRecord)) + require.NoError(t, reader.ForEach(aggTempSel, records.AddRecord)) require.EqualValues(t, map[string]float64{ "observe.sum/A=B/": float64(3 * 10 * multiplier), }, records.Map()) @@ -473,7 +473,7 @@ func TestMultiObserverSum(t *testing.T) { func TestCounterObserverEndToEnd(t *testing.T) { ctx := context.Background() - eselector := export.CumulativeExportKindSelector() + eselector := aggregation.CumulativeTemporalitySelector() proc := basic.New( processorTest.AggregatorSelector(), eselector, diff --git a/sdk/metric/processor/processortest/test.go b/sdk/metric/processor/processortest/test.go index 15488f3370e..bb6f18f7e67 100644 --- a/sdk/metric/processor/processortest/test.go +++ b/sdk/metric/processor/processortest/test.go @@ -82,7 +82,7 @@ type ( // Exporter is a testing implementation of export.Exporter that // assembles its results as a map[string]float64. Exporter struct { - export.ExportKindSelector + aggregation.TemporalitySelector output *Output exportCount int @@ -230,7 +230,7 @@ func NewOutput(labelEncoder attribute.Encoder) *Output { } // ForEach implements export.Reader. -func (o *Output) ForEach(_ export.ExportKindSelector, ff func(export.Record) error) error { +func (o *Output) ForEach(_ aggregation.TemporalitySelector, ff func(export.Record) error) error { for key, value := range o.m { if err := ff(export.NewRecord( key.desc, @@ -281,7 +281,7 @@ func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource // is chosen, whichever is implemented by the underlying Aggregator. func (o *Output) Map() map[string]float64 { r := make(map[string]float64) - err := o.ForEach(export.StatelessExportKindSelector(), func(record export.Record) error { + err := o.ForEach(aggregation.StatelessTemporalitySelector(), func(record export.Record) error { for key, entry := range o.m { encoded := entry.labels.Encoded(o.labelEncoder) rencoded := entry.resource.Encoded(o.labelEncoder) @@ -344,10 +344,10 @@ func (o *Output) AddAccumulation(acc export.Accumulation) error { // // Where in the example A=1,B=2 is the encoded labels and R=V is the // encoded resource value. -func New(selector export.ExportKindSelector, encoder attribute.Encoder) *Exporter { +func New(selector aggregation.TemporalitySelector, encoder attribute.Encoder) *Exporter { return &Exporter{ - ExportKindSelector: selector, - output: NewOutput(encoder), + TemporalitySelector: selector, + output: NewOutput(encoder), } } @@ -356,7 +356,7 @@ func (e *Exporter) Export(_ context.Context, res *resource.Resource, ckpt export defer e.output.Unlock() e.exportCount++ return ckpt.ForEach(func(library instrumentation.Library, mr export.Reader) error { - return mr.ForEach(e.ExportKindSelector, func(r export.Record) error { + return mr.ForEach(e.TemporalitySelector, func(r export.Record) error { if e.InjectErr != nil { if err := e.InjectErr(r); err != nil { return err @@ -433,7 +433,7 @@ type metricReader struct { var _ export.Reader = &metricReader{} -func (m *metricReader) ForEach(_ export.ExportKindSelector, fn func(export.Record) error) error { +func (m *metricReader) ForEach(_ aggregation.TemporalitySelector, fn func(export.Record) error) error { for _, record := range m.records { if err := fn(record); err != nil && err != aggregation.ErrNoData { return err diff --git a/sdk/metric/processor/processortest/test_test.go b/sdk/metric/processor/processortest/test_test.go index 8ee88b55278..2ddbafc0ec8 100644 --- a/sdk/metric/processor/processortest/test_test.go +++ b/sdk/metric/processor/processortest/test_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/instrumentation" metricsdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/processor/processortest" @@ -71,7 +72,7 @@ func TestProcessorTesting(t *testing.T) { // Export the data and validate it again. exporter := processorTest.New( - export.StatelessExportKindSelector(), + aggregation.StatelessTemporalitySelector(), attribute.DefaultEncoder(), ) diff --git a/sdk/metric/processor/reducer/reducer_test.go b/sdk/metric/processor/reducer/reducer_test.go index 652a2195b3b..bac96ae2b6c 100644 --- a/sdk/metric/processor/reducer/reducer_test.go +++ b/sdk/metric/processor/reducer/reducer_test.go @@ -23,7 +23,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/sdkapi" - export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/instrumentation" metricsdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/processor/basic" @@ -91,7 +91,7 @@ func TestFilterProcessor(t *testing.T) { // Test a filter with the ../basic Processor. func TestFilterBasicProcessor(t *testing.T) { - basicProc := basic.New(processorTest.AggregatorSelector(), export.CumulativeExportKindSelector()) + basicProc := basic.New(processorTest.AggregatorSelector(), aggregation.CumulativeTemporalitySelector()) accum := metricsdk.NewAccumulator( reducer.New(testFilter{}, basicProc), )