diff --git a/CHANGELOG.md b/CHANGELOG.md index 1dbd7f1aca5..edbbca91146 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add the `"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc".WithGRPCConn` option so the exporter can reuse an existing gRPC connection. (#2002) - Added a new `schema` module to help parse Schema Files in OTEP 0152 format. (#2267) -- Added a new `MapCarrier` to the `go.opentelemetry.io/otel/propagation` package to hold propagated coss-cutting concerns as a `map[string]string` held in memory. (#2334) +- Added a new `MapCarrier` to the `go.opentelemetry.io/otel/propagation` package to hold propagated cross-cutting concerns as a `map[string]string` held in memory. (#2334) + +### Removed + +- Metric SDK removes the "exact" aggregator for histogram instruments, as it performed a non-standard aggregation for OTLP export (creating repeated Gauge points) and worked its way into a number of confusing examples. (#2348) ## [1.1.0] - 2021-10-27 diff --git a/bridge/opencensus/aggregation.go b/bridge/opencensus/aggregation.go index 44015d96cd9..10616b993aa 100644 --- a/bridge/opencensus/aggregation.go +++ b/bridge/opencensus/aggregation.go @@ -31,127 +31,109 @@ var ( errBadPoint = errors.New("point cannot be converted") ) -// aggregationWithEndTime is an aggregation that can also provide the timestamp -// of the last recorded point. -type aggregationWithEndTime interface { - aggregation.Aggregation - end() time.Time -} +type recordFunc func(agg aggregation.Aggregation, end time.Time) error -// newAggregationFromPoints creates an OpenTelemetry aggregation from -// OpenCensus points. Points may not be empty and must be either +// recordAggregationsFromPoints records one OpenTelemetry aggregation for +// each OpenCensus point. Points may not be empty and must be either // all (int|float)64 or all *metricdata.Distribution. -func newAggregationFromPoints(points []metricdata.Point) (aggregationWithEndTime, error) { +func recordAggregationsFromPoints(points []metricdata.Point, recorder recordFunc) error { if len(points) == 0 { - return nil, errEmpty + return errEmpty } switch t := points[0].Value.(type) { case int64: - return newExactAggregator(points) + return recordGaugePoints(points, recorder) case float64: - return newExactAggregator(points) + return recordGaugePoints(points, recorder) case *metricdata.Distribution: - return newDistributionAggregator(points) + return recordDistributionPoint(points, recorder) default: // TODO add *metricdata.Summary support - return nil, fmt.Errorf("%w: %v", errIncompatibleType, t) + return fmt.Errorf("%w: %v", errIncompatibleType, t) } } -var _ aggregation.Aggregation = &ocExactAggregator{} -var _ aggregation.LastValue = &ocExactAggregator{} -var _ aggregation.Points = &ocExactAggregator{} +var _ aggregation.Aggregation = &ocRawAggregator{} +var _ aggregation.LastValue = &ocRawAggregator{} -// newExactAggregator creates an OpenTelemetry aggreation from OpenCensus points. +// recordGaugePoints creates an OpenTelemetry aggregation from OpenCensus points. // Points may not be empty, and must only contain integers or floats. -func newExactAggregator(pts []metricdata.Point) (aggregationWithEndTime, error) { - points := make([]aggregation.Point, len(pts)) - for i, pt := range pts { +func recordGaugePoints(pts []metricdata.Point, recorder recordFunc) error { + for _, pt := range pts { switch t := pt.Value.(type) { case int64: - points[i] = aggregation.Point{ - Number: number.NewInt64Number(pt.Value.(int64)), - Time: pt.Time, + if err := recorder(&ocRawAggregator{ + value: number.NewInt64Number(pt.Value.(int64)), + time: pt.Time, + }, pt.Time); err != nil { + return err } case float64: - points[i] = aggregation.Point{ - Number: number.NewFloat64Number(pt.Value.(float64)), - Time: pt.Time, + if err := recorder(&ocRawAggregator{ + value: number.NewFloat64Number(pt.Value.(float64)), + time: pt.Time, + }, pt.Time); err != nil { + return err } default: - return nil, fmt.Errorf("%w: %v", errIncompatibleType, t) + return fmt.Errorf("%w: %v", errIncompatibleType, t) } } - return &ocExactAggregator{ - points: points, - }, nil + return nil } -type ocExactAggregator struct { - points []aggregation.Point +type ocRawAggregator struct { + value number.Number + time time.Time } // Kind returns the kind of aggregation this is. -func (o *ocExactAggregator) Kind() aggregation.Kind { - return aggregation.ExactKind -} - -// Points returns access to the raw data set. -func (o *ocExactAggregator) Points() ([]aggregation.Point, error) { - return o.points, nil +func (o *ocRawAggregator) Kind() aggregation.Kind { + return aggregation.LastValueKind } // LastValue returns the last point. -func (o *ocExactAggregator) LastValue() (number.Number, time.Time, error) { - last := o.points[len(o.points)-1] - return last.Number, last.Time, nil -} - -// end returns the timestamp of the last point -func (o *ocExactAggregator) end() time.Time { - _, t, _ := o.LastValue() - return t +func (o *ocRawAggregator) LastValue() (number.Number, time.Time, error) { + return o.value, o.time, nil } var _ aggregation.Aggregation = &ocDistAggregator{} var _ aggregation.Histogram = &ocDistAggregator{} -// newDistributionAggregator creates an OpenTelemetry aggreation from +// recordDistributionPoint creates an OpenTelemetry aggregation from // OpenCensus points. Points may not be empty, and must only contain // Distributions. The most recent disribution will be used in the aggregation. -func newDistributionAggregator(pts []metricdata.Point) (aggregationWithEndTime, error) { +func recordDistributionPoint(pts []metricdata.Point, recorder recordFunc) error { // only use the most recent datapoint for now. pt := pts[len(pts)-1] val, ok := pt.Value.(*metricdata.Distribution) if !ok { - return nil, fmt.Errorf("%w: %v", errBadPoint, pt.Value) + return fmt.Errorf("%w: %v", errBadPoint, pt.Value) } bucketCounts := make([]uint64, len(val.Buckets)) for i, bucket := range val.Buckets { if bucket.Count < 0 { - return nil, fmt.Errorf("%w: bucket count may not be negative", errBadPoint) + return fmt.Errorf("%w: bucket count may not be negative", errBadPoint) } bucketCounts[i] = uint64(bucket.Count) } if val.Count < 0 { - return nil, fmt.Errorf("%w: count may not be negative", errBadPoint) + return fmt.Errorf("%w: count may not be negative", errBadPoint) } - return &ocDistAggregator{ + return recorder(&ocDistAggregator{ sum: number.NewFloat64Number(val.Sum), count: uint64(val.Count), buckets: aggregation.Buckets{ Boundaries: val.BucketOptions.Bounds, Counts: bucketCounts, }, - endTime: pts[len(pts)-1].Time, - }, nil + }, pts[len(pts)-1].Time) } type ocDistAggregator struct { sum number.Number count uint64 buckets aggregation.Buckets - endTime time.Time } // Kind returns the kind of aggregation this is. @@ -173,8 +155,3 @@ func (o *ocDistAggregator) Count() (uint64, error) { func (o *ocDistAggregator) Histogram() (aggregation.Buckets, error) { return o.buckets, nil } - -// end returns the time the histogram was measured. -func (o *ocDistAggregator) end() time.Time { - return o.endTime -} diff --git a/bridge/opencensus/aggregation_test.go b/bridge/opencensus/aggregation_test.go index d6cf8e4944c..dbd9d69b827 100644 --- a/bridge/opencensus/aggregation_test.go +++ b/bridge/opencensus/aggregation_test.go @@ -44,7 +44,7 @@ func TestNewAggregationFromPoints(t *testing.T) { Value: int64(23), }, }, - expectedKind: aggregation.ExactKind, + expectedKind: aggregation.LastValueKind, }, { desc: "float point", @@ -54,7 +54,7 @@ func TestNewAggregationFromPoints(t *testing.T) { Value: float64(23), }, }, - expectedKind: aggregation.ExactKind, + expectedKind: aggregation.LastValueKind, }, { desc: "distribution point", @@ -129,7 +129,7 @@ func TestNewAggregationFromPoints(t *testing.T) { expectedErr: errIncompatibleType, }, { - desc: "dist is incompatible with exact", + desc: "dist is incompatible with raw points", input: []metricdata.Point{ { Time: now, @@ -178,82 +178,57 @@ func TestNewAggregationFromPoints(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - output, err := newAggregationFromPoints(tc.input) + var output []aggregation.Aggregation + err := recordAggregationsFromPoints(tc.input, func(agg aggregation.Aggregation, ts time.Time) error { + last := tc.input[len(tc.input)-1] + if ts != last.Time { + t.Errorf("incorrect timestamp %v != %v", ts, last.Time) + } + output = append(output, agg) + return nil + }) if !errors.Is(err, tc.expectedErr) { t.Errorf("newAggregationFromPoints(%v) = err(%v), want err(%v)", tc.input, err, tc.expectedErr) } - if tc.expectedErr == nil && output.Kind() != tc.expectedKind { - t.Errorf("newAggregationFromPoints(%v) = %v, want %v", tc.input, output.Kind(), tc.expectedKind) + for _, out := range output { + if tc.expectedErr == nil && out.Kind() != tc.expectedKind { + t.Errorf("newAggregationFromPoints(%v) = %v, want %v", tc.input, out.Kind(), tc.expectedKind) + } } }) } } -func TestPointsAggregation(t *testing.T) { - now := time.Now() - input := []metricdata.Point{ - {Value: int64(15)}, - {Value: int64(-23), Time: now}, - } - output, err := newAggregationFromPoints(input) - if err != nil { - t.Fatalf("newAggregationFromPoints(%v) = err(%v), want ", input, err) - } - if output.Kind() != aggregation.ExactKind { - t.Errorf("newAggregationFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.ExactKind) - } - if output.end() != now { - t.Errorf("newAggregationFromPoints(%v).end() = %v, want %v", input, output.end(), now) - } - pointsAgg, ok := output.(aggregation.Points) - if !ok { - t.Errorf("newAggregationFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output) - } - points, err := pointsAgg.Points() - if err != nil { - t.Fatalf("Unexpected err: %v", err) - } - if len(points) != len(input) { - t.Fatalf("newAggregationFromPoints(%v) resulted in %d points, want %d points", input, len(points), len(input)) - } - for i := range points { - inputPoint := input[i] - outputPoint := points[i] - if inputPoint.Value != outputPoint.AsInt64() { - t.Errorf("newAggregationFromPoints(%v)[%d] = %v, want %v", input, i, outputPoint.AsInt64(), inputPoint.Value) - } - } -} - func TestLastValueAggregation(t *testing.T) { now := time.Now() input := []metricdata.Point{ - {Value: int64(15)}, + {Value: int64(15), Time: now.Add(-time.Minute)}, {Value: int64(-23), Time: now}, } - output, err := newAggregationFromPoints(input) - if err != nil { - t.Fatalf("newAggregationFromPoints(%v) = err(%v), want ", input, err) - } - if output.Kind() != aggregation.ExactKind { - t.Errorf("newAggregationFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.ExactKind) - } - if output.end() != now { - t.Errorf("newAggregationFromPoints(%v).end() = %v, want %v", input, output.end(), now) - } - lvAgg, ok := output.(aggregation.LastValue) - if !ok { - t.Errorf("newAggregationFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output) - } - num, endTime, err := lvAgg.LastValue() + idx := 0 + err := recordAggregationsFromPoints(input, func(agg aggregation.Aggregation, end time.Time) error { + if agg.Kind() != aggregation.LastValueKind { + t.Errorf("recordAggregationsFromPoints(%v) = %v, want %v", input, agg.Kind(), aggregation.LastValueKind) + } + if end != input[idx].Time { + t.Errorf("recordAggregationsFromPoints(%v).end() = %v, want %v", input, end, input[idx].Time) + } + pointsLV, ok := agg.(aggregation.LastValue) + if !ok { + t.Errorf("recordAggregationsFromPoints(%v) = %v does not implement the aggregation.LastValue interface", input, agg) + } + lv, ts, _ := pointsLV.LastValue() + if lv.AsInt64() != input[idx].Value { + t.Errorf("recordAggregationsFromPoints(%v) = %v, want %v", input, lv.AsInt64(), input[idx].Value) + } + if ts != input[idx].Time { + t.Errorf("recordAggregationsFromPoints(%v) = %v, want %v", input, ts, input[idx].Time) + } + idx++ + return nil + }) if err != nil { - t.Fatalf("Unexpected err: %v", err) - } - if endTime != now { - t.Errorf("newAggregationFromPoints(%v).LastValue() = endTime: %v, want %v", input, endTime, now) - } - if num.AsInt64() != int64(-23) { - t.Errorf("newAggregationFromPoints(%v).LastValue() = number: %v, want %v", input, num.AsInt64(), int64(-23)) + t.Errorf("recordAggregationsFromPoints(%v) = unexpected error %v", input, err) } } @@ -288,33 +263,39 @@ func TestHistogramAggregation(t *testing.T) { }, }, } - output, err := newAggregationFromPoints(input) + var output aggregation.Aggregation + var end time.Time + err := recordAggregationsFromPoints(input, func(argAgg aggregation.Aggregation, argEnd time.Time) error { + output = argAgg + end = argEnd + return nil + }) if err != nil { - t.Fatalf("newAggregationFromPoints(%v) = err(%v), want ", input, err) + t.Fatalf("recordAggregationsFromPoints(%v) = err(%v), want ", input, err) } if output.Kind() != aggregation.HistogramKind { - t.Errorf("newAggregationFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.HistogramKind) + t.Errorf("recordAggregationsFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.HistogramKind) } - if output.end() != now { - t.Errorf("newAggregationFromPoints(%v).end() = %v, want %v", input, output.end(), now) + if end != now { + t.Errorf("recordAggregationsFromPoints(%v).end() = %v, want %v", input, end, now) } distAgg, ok := output.(aggregation.Histogram) if !ok { - t.Errorf("newAggregationFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output) + t.Errorf("recordAggregationsFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output) } sum, err := distAgg.Sum() if err != nil { t.Fatalf("Unexpected err: %v", err) } if sum.AsFloat64() != float64(55) { - t.Errorf("newAggregationFromPoints(%v).Sum() = %v, want %v", input, sum.AsFloat64(), float64(55)) + t.Errorf("recordAggregationsFromPoints(%v).Sum() = %v, want %v", input, sum.AsFloat64(), float64(55)) } count, err := distAgg.Count() if err != nil { t.Fatalf("Unexpected err: %v", err) } if count != 2 { - t.Errorf("newAggregationFromPoints(%v).Count() = %v, want %v", input, count, 2) + t.Errorf("recordAggregationsFromPoints(%v).Count() = %v, want %v", input, count, 2) } hist, err := distAgg.Histogram() if err != nil { @@ -322,20 +303,20 @@ func TestHistogramAggregation(t *testing.T) { } inputBucketBoundaries := []float64{20, 30} if len(hist.Boundaries) != len(inputBucketBoundaries) { - t.Fatalf("newAggregationFromPoints(%v).Histogram() produced %d boundaries, want %d boundaries", input, len(hist.Boundaries), len(inputBucketBoundaries)) + t.Fatalf("recordAggregationsFromPoints(%v).Histogram() produced %d boundaries, want %d boundaries", input, len(hist.Boundaries), len(inputBucketBoundaries)) } for i, b := range hist.Boundaries { if b != inputBucketBoundaries[i] { - t.Errorf("newAggregationFromPoints(%v).Histogram().Boundaries[%d] = %v, want %v", input, i, b, inputBucketBoundaries[i]) + t.Errorf("recordAggregationsFromPoints(%v).Histogram().Boundaries[%d] = %v, want %v", input, i, b, inputBucketBoundaries[i]) } } inputBucketCounts := []uint64{1, 1} if len(hist.Counts) != len(inputBucketCounts) { - t.Fatalf("newAggregationFromPoints(%v).Histogram() produced %d buckets, want %d buckets", input, len(hist.Counts), len(inputBucketCounts)) + t.Fatalf("recordAggregationsFromPoints(%v).Histogram() produced %d buckets, want %d buckets", input, len(hist.Counts), len(inputBucketCounts)) } for i, c := range hist.Counts { if c != inputBucketCounts[i] { - t.Errorf("newAggregationFromPoints(%v).Histogram().Counts[%d] = %d, want %d", input, i, c, inputBucketCounts[i]) + t.Errorf("recordAggregationsFromPoints(%v).Histogram().Counts[%d] = %d, want %d", input, i, c, inputBucketCounts[i]) } } } diff --git a/bridge/opencensus/exporter.go b/bridge/opencensus/exporter.go index d52494367d1..1720c67f21f 100644 --- a/bridge/opencensus/exporter.go +++ b/bridge/opencensus/exporter.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "sync" + "time" "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricexport" @@ -95,18 +96,18 @@ func (d *metricReader) ForEach(_ aggregation.TemporalitySelector, f func(export. otel.Handle(err) continue } - agg, err := newAggregationFromPoints(ts.Points) - if err != nil { - otel.Handle(err) - continue - } - if err := f(export.NewRecord( - &descriptor, - &ls, - agg, - ts.StartTime, - agg.end(), - )); err != nil && !errors.Is(err, aggregation.ErrNoData) { + err = recordAggregationsFromPoints( + ts.Points, + func(agg aggregation.Aggregation, end time.Time) error { + return f(export.NewRecord( + &descriptor, + &ls, + agg, + ts.StartTime, + end, + )) + }) + if err != nil && !errors.Is(err, aggregation.ErrNoData) { return err } } diff --git a/bridge/opencensus/exporter_test.go b/bridge/opencensus/exporter_test.go index 934cdac80ea..0910cb5720b 100644 --- a/bridge/opencensus/exporter_test.go +++ b/bridge/opencensus/exporter_test.go @@ -141,7 +141,7 @@ func TestExportMetrics(t *testing.T) { }, }, }, - expectedHandledError: errIncompatibleType, + exportErr: errIncompatibleType, }, { desc: "success", @@ -171,13 +171,9 @@ func TestExportMetrics(t *testing.T) { export.NewRecord( &basicDesc, attribute.EmptySet(), - &ocExactAggregator{ - points: []aggregation.Point{ - { - Number: number.NewInt64Number(123), - Time: now, - }, - }, + &ocRawAggregator{ + value: number.NewInt64Number(123), + time: now, }, now, now, @@ -202,13 +198,9 @@ func TestExportMetrics(t *testing.T) { export.NewRecord( &basicDesc, attribute.EmptySet(), - &ocExactAggregator{ - points: []aggregation.Point{ - { - Number: number.NewInt64Number(123), - Time: now, - }, - }, + &ocRawAggregator{ + value: number.NewInt64Number(123), + time: now, }, now, now, @@ -236,13 +228,9 @@ func TestExportMetrics(t *testing.T) { export.NewRecord( &basicDesc, attribute.EmptySet(), - &ocExactAggregator{ - points: []aggregation.Point{ - { - Number: number.NewInt64Number(123), - Time: now, - }, - }, + &ocRawAggregator{ + value: number.NewInt64Number(123), + time: now, }, now, now, diff --git a/exporters/otlp/otlpmetric/internal/metrictransform/metric.go b/exporters/otlp/otlpmetric/internal/metrictransform/metric.go index b535afdcc48..2b342fbc17f 100644 --- a/exporters/otlp/otlpmetric/internal/metrictransform/metric.go +++ b/exporters/otlp/otlpmetric/internal/metrictransform/metric.go @@ -275,70 +275,11 @@ func Record(temporalitySelector aggregation.TemporalitySelector, r export.Record } return gaugePoint(r, value, time.Time{}, tm) - case aggregation.ExactKind: - e, ok := agg.(aggregation.Points) - if !ok { - return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg) - } - pts, err := e.Points() - if err != nil { - return nil, err - } - - return gaugeArray(r, pts) - default: return nil, fmt.Errorf("%w: %T", ErrUnimplementedAgg, agg) } } -func gaugeArray(record export.Record, points []aggregation.Point) (*metricpb.Metric, error) { - desc := record.Descriptor() - labels := record.Labels() - m := &metricpb.Metric{ - Name: desc.Name(), - Description: desc.Description(), - Unit: string(desc.Unit()), - } - - pbAttrs := Iterator(labels.Iter()) - - ndp := make([]*metricpb.NumberDataPoint, 0, len(points)) - switch nk := desc.NumberKind(); nk { - case number.Int64Kind: - for _, p := range points { - ndp = append(ndp, &metricpb.NumberDataPoint{ - Attributes: pbAttrs, - StartTimeUnixNano: toNanos(record.StartTime()), - TimeUnixNano: toNanos(record.EndTime()), - Value: &metricpb.NumberDataPoint_AsInt{ - AsInt: p.Number.CoerceToInt64(nk), - }, - }) - } - case number.Float64Kind: - for _, p := range points { - ndp = append(ndp, &metricpb.NumberDataPoint{ - Attributes: pbAttrs, - StartTimeUnixNano: toNanos(record.StartTime()), - TimeUnixNano: toNanos(record.EndTime()), - Value: &metricpb.NumberDataPoint_AsDouble{ - AsDouble: p.Number.CoerceToFloat64(nk), - }, - }) - } - default: - return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, nk) - } - - m.Data = &metricpb.Metric_Gauge{ - Gauge: &metricpb.Gauge{ - DataPoints: ndp, - }, - } - return m, nil -} - func gaugePoint(record export.Record, num number.Number, start, end time.Time) (*metricpb.Metric, error) { desc := record.Descriptor() labels := record.Labels() diff --git a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go index e331666f5a8..120e85852c3 100644 --- a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go +++ b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go @@ -30,7 +30,6 @@ import ( "go.opentelemetry.io/otel/metric/sdkapi" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" - arrAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" @@ -288,75 +287,6 @@ func TestLastValueIntDataPoints(t *testing.T) { } } -func TestExactIntDataPoints(t *testing.T) { - desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Int64Kind) - labels := attribute.NewSet(attribute.String("one", "1")) - arrs := arrAgg.New(2) - e, ckpt := &arrs[0], &arrs[1] - - assert.NoError(t, e.Update(context.Background(), number.Number(100), &desc)) - require.NoError(t, e.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) - pts, err := ckpt.Points() - require.NoError(t, err) - - if m, err := gaugeArray(record, pts); assert.NoError(t, err) { - assert.Equal(t, []*metricpb.NumberDataPoint{{ - StartTimeUnixNano: toNanos(intervalStart), - TimeUnixNano: toNanos(intervalEnd), - Attributes: []*commonpb.KeyValue{ - { - Key: "one", - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "1"}}, - }, - }, - Value: &metricpb.NumberDataPoint_AsInt{ - AsInt: 100, - }, - }}, m.GetGauge().DataPoints) - assert.Nil(t, m.GetSum()) - assert.Nil(t, m.GetHistogram()) - assert.Nil(t, m.GetSummary()) - assert.Nil(t, m.GetIntGauge()) // nolint - assert.Nil(t, m.GetIntSum()) // nolint - assert.Nil(t, m.GetIntHistogram()) // nolint - } -} - -func TestExactFloatDataPoints(t *testing.T) { - desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Float64Kind) - labels := attribute.NewSet(attribute.String("one", "1")) - arrs := arrAgg.New(2) - e, ckpt := &arrs[0], &arrs[1] - assert.NoError(t, e.Update(context.Background(), number.NewFloat64Number(100), &desc)) - require.NoError(t, e.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) - pts, err := ckpt.Points() - require.NoError(t, err) - - if m, err := gaugeArray(record, pts); assert.NoError(t, err) { - assert.Equal(t, []*metricpb.NumberDataPoint{{ - Value: &metricpb.NumberDataPoint_AsDouble{ - AsDouble: 100, - }, - StartTimeUnixNano: toNanos(intervalStart), - TimeUnixNano: toNanos(intervalEnd), - Attributes: []*commonpb.KeyValue{ - { - Key: "one", - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "1"}}, - }, - }, - }}, m.GetGauge().DataPoints) - assert.Nil(t, m.GetSum()) - assert.Nil(t, m.GetHistogram()) - assert.Nil(t, m.GetSummary()) - assert.Nil(t, m.GetIntGauge()) // nolint - assert.Nil(t, m.GetIntSum()) // nolint - assert.Nil(t, m.GetIntHistogram()) // nolint - } -} - func TestSumErrUnknownValueType(t *testing.T) { desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Kind(-1)) labels := attribute.NewSet() @@ -469,12 +399,6 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) { require.Error(t, err) require.Nil(t, mpb) require.True(t, errors.Is(err, ErrIncompatibleAgg)) - - mpb, err = makeMpb(aggregation.ExactKind, &lastvalue.New(1)[0]) - - require.Error(t, err) - require.Nil(t, mpb) - require.True(t, errors.Is(err, ErrIncompatibleAgg)) } func TestRecordAggregatorUnexpectedErrors(t *testing.T) { diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/example_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/example_test.go index ec9b4fd68c5..f37c39095f7 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/example_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/example_test.go @@ -48,7 +48,7 @@ func Example_insecure() { pusher := controller.New( processor.NewFactory( - simple.NewWithExactDistribution(), + simple.NewWithHistogramDistribution(), exp, ), controller.WithExporter(exp), @@ -107,7 +107,7 @@ func Example_withTLS() { pusher := controller.New( processor.NewFactory( - simple.NewWithExactDistribution(), + simple.NewWithHistogramDistribution(), exp, ), controller.WithExporter(exp), @@ -164,7 +164,7 @@ func Example_withDifferentSignalCollectors() { pusher := controller.New( processor.NewFactory( - simple.NewWithExactDistribution(), + simple.NewWithHistogramDistribution(), exp, ), controller.WithExporter(exp), diff --git a/exporters/prometheus/prometheus.go b/exporters/prometheus/prometheus.go index 9fc276caafc..369c0ce1fe1 100644 --- a/exporters/prometheus/prometheus.go +++ b/exporters/prometheus/prometheus.go @@ -58,7 +58,7 @@ type Exporter struct { } // ErrUnsupportedAggregator is returned for unrepresentable aggregator -// types (e.g., exact). +// types. var ErrUnsupportedAggregator = fmt.Errorf("unsupported aggregator type") var _ http.Handler = &Exporter{} diff --git a/sdk/export/metric/aggregation/aggregation.go b/sdk/export/metric/aggregation/aggregation.go index 80060a99e84..51d3bd31c45 100644 --- a/sdk/export/metric/aggregation/aggregation.go +++ b/sdk/export/metric/aggregation/aggregation.go @@ -64,22 +64,6 @@ type ( LastValue() (number.Number, time.Time, error) } - // Points returns the raw values that were aggregated. - Points interface { - Aggregation - - // Points returns points in the order they were - // recorded. Points are approximately ordered by - // timestamp, but this is not guaranteed. - Points() ([]Point, error) - } - - // Point is a raw data point, consisting of a number and value. - Point struct { - number.Number - time.Time - } - // Buckets represents histogram buckets boundaries and counts. // // For a Histogram with N defined boundaries, e.g, [x, y, z]. @@ -134,7 +118,6 @@ const ( MinMaxSumCountKind Kind = "MinMaxSumCount" HistogramKind Kind = "Histogram" LastValueKind Kind = "Lastvalue" - ExactKind Kind = "Exact" ) // Sentinel errors for Aggregation interface. diff --git a/sdk/metric/aggregator/exact/exact.go b/sdk/metric/aggregator/exact/exact.go deleted file mode 100644 index 336cd878fd0..00000000000 --- a/sdk/metric/aggregator/exact/exact.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package exact // import "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" - -import ( - "context" - "sync" - "time" - - "go.opentelemetry.io/otel/metric/number" - "go.opentelemetry.io/otel/metric/sdkapi" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/aggregator" -) - -type ( - // Aggregator aggregates events that form a distribution, keeping - // an array with the exact set of values. - Aggregator struct { - lock sync.Mutex - samples []aggregation.Point - } -) - -var _ export.Aggregator = &Aggregator{} -var _ aggregation.Points = &Aggregator{} -var _ aggregation.Count = &Aggregator{} - -// New returns cnt many new exact aggregators, which aggregate recorded -// measurements by storing them in an array. This type uses a mutex -// for Update() and SynchronizedMove() concurrency. -func New(cnt int) []Aggregator { - return make([]Aggregator, cnt) -} - -// Aggregation returns an interface for reading the state of this aggregator. -func (c *Aggregator) Aggregation() aggregation.Aggregation { - return c -} - -// Kind returns aggregation.ExactKind. -func (c *Aggregator) Kind() aggregation.Kind { - return aggregation.ExactKind -} - -// Count returns the number of values in the checkpoint. -func (c *Aggregator) Count() (uint64, error) { - return uint64(len(c.samples)), nil -} - -// Points returns access to the raw data set. -func (c *Aggregator) Points() ([]aggregation.Point, error) { - return c.samples, nil -} - -// SynchronizedMove saves the current state to oa and resets the current state to -// the empty set, taking a lock to prevent concurrent Update() calls. -func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *sdkapi.Descriptor) error { - o, _ := oa.(*Aggregator) - - if oa != nil && o == nil { - return aggregator.NewInconsistentAggregatorError(c, oa) - } - - c.lock.Lock() - defer c.lock.Unlock() - - if o != nil { - o.samples = c.samples - } - c.samples = nil - - return nil -} - -// Update adds the recorded measurement to the current data set. -// Update takes a lock to prevent concurrent Update() and SynchronizedMove() -// calls. -func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkapi.Descriptor) error { - now := time.Now() - c.lock.Lock() - defer c.lock.Unlock() - c.samples = append(c.samples, aggregation.Point{ - Number: number, - Time: now, - }) - - return nil -} - -// Merge combines two data sets into one. -func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error { - o, _ := oa.(*Aggregator) - if o == nil { - return aggregator.NewInconsistentAggregatorError(c, oa) - } - - c.samples = combine(c.samples, o.samples) - return nil -} - -func combine(a, b []aggregation.Point) []aggregation.Point { - result := make([]aggregation.Point, 0, len(a)+len(b)) - - for len(a) != 0 && len(b) != 0 { - if a[0].Time.Before(b[0].Time) { - result = append(result, a[0]) - a = a[1:] - } else { - result = append(result, b[0]) - b = b[1:] - } - } - result = append(result, a...) - result = append(result, b...) - return result -} diff --git a/sdk/metric/aggregator/exact/exact_test.go b/sdk/metric/aggregator/exact/exact_test.go deleted file mode 100644 index 0ef1fdf9379..00000000000 --- a/sdk/metric/aggregator/exact/exact_test.go +++ /dev/null @@ -1,377 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package exact - -import ( - "fmt" - "math" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/otel/metric/number" - "go.opentelemetry.io/otel/metric/sdkapi" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" -) - -type updateTest struct { - count int -} - -func requireNotAfter(t *testing.T, t1, t2 time.Time) { - require.False(t, t1.After(t2), "expected %v ≤ %v", t1, t2) -} - -func checkZero(t *testing.T, agg *Aggregator, desc *sdkapi.Descriptor) { - count, err := agg.Count() - require.NoError(t, err) - require.Equal(t, uint64(0), count) - - pts, err := agg.Points() - require.NoError(t, err) - require.Equal(t, 0, len(pts)) -} - -func new2() (_, _ *Aggregator) { - alloc := New(2) - return &alloc[0], &alloc[1] -} - -func new4() (_, _, _, _ *Aggregator) { - alloc := New(4) - return &alloc[0], &alloc[1], &alloc[2], &alloc[3] -} - -func sumOf(samples []aggregation.Point, k number.Kind) number.Number { - var n number.Number - for _, s := range samples { - n.AddNumber(k, s.Number) - } - return n -} - -func (ut *updateTest) run(t *testing.T, profile aggregatortest.Profile) { - descriptor := aggregatortest.NewAggregatorTest(sdkapi.HistogramInstrumentKind, profile.NumberKind) - agg, ckpt := new2() - - all := aggregatortest.NewNumbers(profile.NumberKind) - - for i := 0; i < ut.count; i++ { - x := profile.Random(+1) - all.Append(x) - advance() - aggregatortest.CheckedUpdate(t, agg, x, descriptor) - - y := profile.Random(-1) - all.Append(y) - advance() - aggregatortest.CheckedUpdate(t, agg, y, descriptor) - } - - err := agg.SynchronizedMove(ckpt, descriptor) - require.NoError(t, err) - - checkZero(t, agg, descriptor) - - all.Sort() - - pts, err := ckpt.Points() - require.Nil(t, err) - sum := sumOf(pts, profile.NumberKind) - allSum := all.Sum() - require.InEpsilon(t, - allSum.CoerceToFloat64(profile.NumberKind), - sum.CoerceToFloat64(profile.NumberKind), - 0.0000001, - "Same sum") - count, err := ckpt.Count() - require.Nil(t, err) - require.Equal(t, all.Count(), count, "Same count") -} - -func TestExactUpdate(t *testing.T) { - // Test with an odd an even number of measurements - for count := 999; count <= 1000; count++ { - t.Run(fmt.Sprint("Odd=", count%2 == 1), func(t *testing.T) { - ut := updateTest{ - count: count, - } - - // Test integer and floating point - aggregatortest.RunProfiles(t, ut.run) - }) - } -} - -type mergeTest struct { - count int - absolute bool -} - -func advance() { - time.Sleep(time.Nanosecond) -} - -func (mt *mergeTest) run(t *testing.T, profile aggregatortest.Profile) { - descriptor := aggregatortest.NewAggregatorTest(sdkapi.HistogramInstrumentKind, profile.NumberKind) - agg1, agg2, ckpt1, ckpt2 := new4() - - all := aggregatortest.NewNumbers(profile.NumberKind) - - for i := 0; i < mt.count; i++ { - x1 := profile.Random(+1) - all.Append(x1) - advance() - aggregatortest.CheckedUpdate(t, agg1, x1, descriptor) - - x2 := profile.Random(+1) - all.Append(x2) - advance() - aggregatortest.CheckedUpdate(t, agg2, x2, descriptor) - - if !mt.absolute { - y1 := profile.Random(-1) - all.Append(y1) - advance() - aggregatortest.CheckedUpdate(t, agg1, y1, descriptor) - - y2 := profile.Random(-1) - all.Append(y2) - advance() - aggregatortest.CheckedUpdate(t, agg2, y2, descriptor) - } - } - - require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor)) - require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor)) - - checkZero(t, agg1, descriptor) - checkZero(t, agg2, descriptor) - - aggregatortest.CheckedMerge(t, ckpt1, ckpt2, descriptor) - - pts, err := ckpt1.Points() - require.Nil(t, err) - - received := aggregatortest.NewNumbers(profile.NumberKind) - for i, s := range pts { - received.Append(s.Number) - - if i > 0 { - requireNotAfter(t, pts[i-1].Time, pts[i].Time) - } - } - - allSum := all.Sum() - sum := sumOf(pts, profile.NumberKind) - require.InEpsilon(t, - allSum.CoerceToFloat64(profile.NumberKind), - sum.CoerceToFloat64(profile.NumberKind), - 0.0000001, - "Same sum - absolute") - count, err := ckpt1.Count() - require.Nil(t, err) - require.Equal(t, all.Count(), count, "Same count - absolute") - require.Equal(t, all, received, "Same ordered contents") -} - -func TestExactMerge(t *testing.T) { - // Test with an odd an even number of measurements - for count := 999; count <= 1000; count++ { - t.Run(fmt.Sprint("Odd=", count%2 == 1), func(t *testing.T) { - // Test absolute and non-absolute - for _, absolute := range []bool{false, true} { - t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) { - mt := mergeTest{ - count: count, - absolute: absolute, - } - - // Test integer and floating point - aggregatortest.RunProfiles(t, mt.run) - }) - } - }) - } -} - -func TestExactErrors(t *testing.T) { - aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) { - agg, ckpt := new2() - - descriptor := aggregatortest.NewAggregatorTest(sdkapi.HistogramInstrumentKind, profile.NumberKind) - - advance() - aggregatortest.CheckedUpdate(t, agg, number.Number(0), descriptor) - - if profile.NumberKind == number.Float64Kind { - advance() - aggregatortest.CheckedUpdate(t, agg, number.NewFloat64Number(math.NaN()), descriptor) - } - require.NoError(t, agg.SynchronizedMove(ckpt, descriptor)) - - count, err := ckpt.Count() - require.Equal(t, uint64(1), count, "NaN value was not counted") - require.Nil(t, err) - }) -} - -func TestExactFloat64(t *testing.T) { - descriptor := aggregatortest.NewAggregatorTest(sdkapi.HistogramInstrumentKind, number.Float64Kind) - - fpsf := func(sign int) []float64 { - // Check behavior of a bunch of odd floating - // points except for NaN, which is invalid. - return []float64{ - 0, - 1 / math.Inf(sign), - 1, - 2, - 1e100, - math.MaxFloat64, - math.SmallestNonzeroFloat64, - math.MaxFloat32, - math.SmallestNonzeroFloat32, - math.E, - math.Pi, - math.Phi, - math.Sqrt2, - math.SqrtE, - math.SqrtPi, - math.SqrtPhi, - math.Ln2, - math.Log2E, - math.Ln10, - math.Log10E, - } - } - - all := aggregatortest.NewNumbers(number.Float64Kind) - - agg, ckpt := new2() - - startTime := time.Now() - - for _, f := range fpsf(1) { - all.Append(number.NewFloat64Number(f)) - advance() - aggregatortest.CheckedUpdate(t, agg, number.NewFloat64Number(f), descriptor) - } - - for _, f := range fpsf(-1) { - all.Append(number.NewFloat64Number(f)) - advance() - aggregatortest.CheckedUpdate(t, agg, number.NewFloat64Number(f), descriptor) - } - - endTime := time.Now() - - require.NoError(t, agg.SynchronizedMove(ckpt, descriptor)) - - pts, err := ckpt.Points() - require.Nil(t, err) - - allSum := all.Sum() - sum := sumOf(pts, number.Float64Kind) - require.InEpsilon(t, allSum.AsFloat64(), sum.AsFloat64(), 0.0000001, "Same sum") - - count, err := ckpt.Count() - require.Equal(t, all.Count(), count, "Same count") - require.Nil(t, err) - - po, err := ckpt.Points() - require.Nil(t, err) - require.Equal(t, all.Len(), len(po), "Points() must have same length of updates") - for i := 0; i < len(po); i++ { - require.Equal(t, all.Points()[i], po[i].Number, "Wrong point at position %d", i) - if i > 0 { - requireNotAfter(t, po[i-1].Time, po[i].Time) - } - } - requireNotAfter(t, startTime, po[0].Time) - requireNotAfter(t, po[len(po)-1].Time, endTime) -} - -func TestSynchronizedMoveReset(t *testing.T) { - aggregatortest.SynchronizedMoveResetTest( - t, - sdkapi.HistogramInstrumentKind, - func(desc *sdkapi.Descriptor) export.Aggregator { - return &New(1)[0] - }, - ) -} - -func TestMergeBehavior(t *testing.T) { - aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) { - for _, forward := range []bool{false, true} { - t.Run(fmt.Sprint("Forward=", forward), func(t *testing.T) { - descriptor := aggregatortest.NewAggregatorTest(sdkapi.HistogramInstrumentKind, profile.NumberKind) - agg1, agg2, ckpt, _ := new4() - - all := aggregatortest.NewNumbers(profile.NumberKind) - - for i := 0; i < 100; i++ { - x1 := profile.Random(+1) - all.Append(x1) - advance() - aggregatortest.CheckedUpdate(t, agg1, x1, descriptor) - } - - for i := 0; i < 100; i++ { - x2 := profile.Random(+1) - all.Append(x2) - advance() - aggregatortest.CheckedUpdate(t, agg2, x2, descriptor) - } - - if forward { - aggregatortest.CheckedMerge(t, ckpt, agg1, descriptor) - aggregatortest.CheckedMerge(t, ckpt, agg2, descriptor) - } else { - aggregatortest.CheckedMerge(t, ckpt, agg2, descriptor) - aggregatortest.CheckedMerge(t, ckpt, agg1, descriptor) - } - - pts, err := ckpt.Points() - require.NoError(t, err) - - received := aggregatortest.NewNumbers(profile.NumberKind) - for i, s := range pts { - received.Append(s.Number) - - if i > 0 { - requireNotAfter(t, pts[i-1].Time, pts[i].Time) - } - } - - allSum := all.Sum() - sum := sumOf(pts, profile.NumberKind) - require.InEpsilon(t, - allSum.CoerceToFloat64(profile.NumberKind), - sum.CoerceToFloat64(profile.NumberKind), - 0.0000001, - "Same sum - absolute") - count, err := ckpt.Count() - require.NoError(t, err) - require.Equal(t, all.Count(), count, "Same count - absolute") - require.Equal(t, all, received, "Same ordered contents") - }) - } - }) -} diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 773a99de0a5..e41a3f9924d 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -136,7 +136,7 @@ func TestInputRangeHistogram(t *testing.T) { ctx := context.Background() meter, sdk, _, processor := newSDK(t) - histogram := Must(meter).NewFloat64Histogram("name.exact") + histogram := Must(meter).NewFloat64Histogram("name.histogram") histogram.Record(ctx, math.NaN()) require.Equal(t, aggregation.ErrNaNInput, testHandler.Flush()) @@ -151,7 +151,7 @@ func TestInputRangeHistogram(t *testing.T) { checkpointed = sdk.Collect(ctx) require.Equal(t, map[string]float64{ - "name.exact//": 3, + "name.histogram//": 3, }, processor.Values()) require.Equal(t, 1, checkpointed) require.Nil(t, testHandler.Flush()) @@ -450,8 +450,8 @@ func TestRecordBatch(t *testing.T) { counter1 := Must(meter).NewInt64Counter("int64.sum") counter2 := Must(meter).NewFloat64Counter("float64.sum") - histogram1 := Must(meter).NewInt64Histogram("int64.exact") - histogram2 := Must(meter).NewFloat64Histogram("float64.exact") + histogram1 := Must(meter).NewInt64Histogram("int64.histogram") + histogram2 := Must(meter).NewFloat64Histogram("float64.histogram") sdk.RecordBatch( ctx, @@ -468,10 +468,10 @@ func TestRecordBatch(t *testing.T) { sdk.Collect(ctx) require.EqualValues(t, map[string]float64{ - "int64.sum/A=B,C=D/": 1, - "float64.sum/A=B,C=D/": 2, - "int64.exact/A=B,C=D/": 3, - "float64.exact/A=B,C=D/": 4, + "int64.sum/A=B,C=D/": 1, + "float64.sum/A=B,C=D/": 2, + "int64.histogram/A=B,C=D/": 3, + "float64.histogram/A=B,C=D/": 4, }, processor.Values()) } diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index d4d90e95050..c972a963fb6 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -83,7 +83,6 @@ func TestProcessor(t *testing.T) { {kind: aggregation.MinMaxSumCountKind}, {kind: aggregation.HistogramKind}, {kind: aggregation.LastValueKind}, - {kind: aggregation.ExactKind}, } { t.Run(ac.kind.String(), func(t *testing.T) { testProcessor( diff --git a/sdk/metric/processor/processortest/test.go b/sdk/metric/processor/processortest/test.go index bb6f18f7e67..6f7e56ef8b8 100644 --- a/sdk/metric/processor/processortest/test.go +++ b/sdk/metric/processor/processortest/test.go @@ -22,12 +22,10 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" @@ -208,11 +206,6 @@ func (testAggregatorSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ... for i := range aggPtrs { *aggPtrs[i] = &aggs[i] } - case strings.HasSuffix(desc.Name(), ".exact"): - aggs := exact.New(len(aggPtrs)) - for i := range aggPtrs { - *aggPtrs[i] = &aggs[i] - } default: panic(fmt.Sprint("Invalid instrument name for test AggregatorSelector: ", desc.Name())) } @@ -292,13 +285,6 @@ func (o *Output) Map() map[string]float64 { } else if l, ok := entry.aggregator.(aggregation.LastValue); ok { last, _, _ := l.LastValue() value = last.CoerceToFloat64(key.desc.NumberKind()) - } else if l, ok := entry.aggregator.(aggregation.Points); ok { - pts, _ := l.Points() - var sum number.Number - for _, s := range pts { - sum.AddNumber(key.desc.NumberKind(), s.Number) - } - value = sum.CoerceToFloat64(key.desc.NumberKind()) } else { panic(fmt.Sprintf("Unhandled aggregator type: %T", entry.aggregator)) } diff --git a/sdk/metric/processor/reducer/doc.go b/sdk/metric/processor/reducer/doc.go index cebdf0355e4..700a9c97844 100644 --- a/sdk/metric/processor/reducer/doc.go +++ b/sdk/metric/processor/reducer/doc.go @@ -41,20 +41,20 @@ func (someFilter) LabelFilterFor(_ *sdkapi.Descriptor) attribute.Filter { } func setupMetrics(exporter export.Exporter) (stop func()) { - basicProcessor := basic.New( - simple.NewWithExactDistribution(), + basicProcessorFactory := basic.NewFactory( + simple.NewWithHistogramDistribution(), exporter, ) - reducerProcessor := reducer.New(someFilter{...}, basicProcessor) + reducerProcessor := reducer.NewFactory(someFilter{...}, basicProcessorFactory) - pusher := push.New( + controller := controller.New( reducerProcessor, exporter, - pushOpts..., + opts..., ) - pusher.Start() - global.SetMeterProvider(pusher.Provider()) - return pusher.Stop + controller.Start() + global.SetMeterProvider(controller.Provider()) + return controller.Stop */ package reducer // import "go.opentelemetry.io/otel/sdk/metric/processor/reducer" diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index 0f06827f05f..03cd9d7312b 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -17,7 +17,6 @@ package simple // import "go.opentelemetry.io/otel/sdk/metric/selector/simple" import ( "go.opentelemetry.io/otel/metric/sdkapi" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" @@ -26,7 +25,6 @@ import ( type ( selectorInexpensive struct{} - selectorExact struct{} selectorHistogram struct { options []histogram.Option } @@ -34,7 +32,6 @@ type ( var ( _ export.AggregatorSelector = selectorInexpensive{} - _ export.AggregatorSelector = selectorExact{} _ export.AggregatorSelector = selectorHistogram{} ) @@ -47,15 +44,6 @@ func NewWithInexpensiveDistribution() export.AggregatorSelector { return selectorInexpensive{} } -// NewWithExactDistribution returns a simple aggregator selector that -// uses exact aggregators for `Histogram` instruments. This -// selector uses more memory than the others in this package because -// exact aggregators maintain the most information about the -// distribution among these choices. -func NewWithExactDistribution() export.AggregatorSelector { - return selectorExact{} -} - // NewWithHistogramDistribution returns a simple aggregator selector // that uses histogram aggregators for `Histogram` instruments. // This selector is a good default choice for most metric exporters. @@ -91,20 +79,6 @@ func (selectorInexpensive) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs } } -func (selectorExact) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { - switch descriptor.InstrumentKind() { - case sdkapi.GaugeObserverInstrumentKind: - lastValueAggs(aggPtrs) - case sdkapi.HistogramInstrumentKind: - aggs := exact.New(len(aggPtrs)) - for i := range aggPtrs { - *aggPtrs[i] = &aggs[i] - } - default: - sumAggs(aggPtrs) - } -} - func (s selectorHistogram) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) { switch descriptor.InstrumentKind() { case sdkapi.GaugeObserverInstrumentKind: diff --git a/sdk/metric/selector/simple/simple_test.go b/sdk/metric/selector/simple/simple_test.go index 12e629d0403..4f3746bb007 100644 --- a/sdk/metric/selector/simple/simple_test.go +++ b/sdk/metric/selector/simple/simple_test.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" @@ -60,12 +59,6 @@ func TestInexpensiveDistribution(t *testing.T) { testFixedSelectors(t, inex) } -func TestExactDistribution(t *testing.T) { - ex := simple.NewWithExactDistribution() - require.IsType(t, (*exact.Aggregator)(nil), oneAgg(ex, &testHistogramDesc)) - testFixedSelectors(t, ex) -} - func TestHistogramDistribution(t *testing.T) { hist := simple.NewWithHistogramDistribution() require.IsType(t, (*histogram.Aggregator)(nil), oneAgg(hist, &testHistogramDesc))