Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove exact aggregator for histogram instruments #2348

Merged
merged 10 commits into from Nov 15, 2021
6 changes: 5 additions & 1 deletion CHANGELOG.md
Expand Up @@ -22,7 +22,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add the `"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc".WithGRPCConn` option so the exporter can reuse an existing gRPC connection. (#2002)
- Added a new `schema` module to help parse Schema Files in OTEP 0152 format. (#2267)
- Added a new `MapCarrier` to the `go.opentelemetry.io/otel/propagation` package to hold propagated coss-cutting concerns as a `map[string]string` held in memory. (#2334)
- Added a new `MapCarrier` to the `go.opentelemetry.io/otel/propagation` package to hold propagated cross-cutting concerns as a `map[string]string` held in memory. (#2334)

### Removed

- Metric SDK removes the "exact" aggregator for histogram instruments, as it performed a non-standard aggregation for OTLP export (creating repeated Gauge points) and worked its way into a number of confusing examples.
jmacd marked this conversation as resolved.
Show resolved Hide resolved

## [1.1.0] - 2021-10-27

Expand Down
105 changes: 42 additions & 63 deletions bridge/opencensus/aggregation.go
Expand Up @@ -31,127 +31,111 @@ var (
errBadPoint = errors.New("point cannot be converted")
)

// aggregationWithEndTime is an aggregation that can also provide the timestamp
// of the last recorded point.
type aggregationWithEndTime interface {
aggregation.Aggregation
end() time.Time
}
type recordFunc func(agg aggregation.Aggregation, end time.Time) error

// newAggregationFromPoints creates an OpenTelemetry aggregation from
// OpenCensus points. Points may not be empty and must be either
// recordAggregationsFromPoints records one OpenTelemetry aggregation for
// each OpenCensus point. Points may not be empty and must be either
// all (int|float)64 or all *metricdata.Distribution.
func newAggregationFromPoints(points []metricdata.Point) (aggregationWithEndTime, error) {
func recordAggregationsFromPoints(
points []metricdata.Point,
recorder recordFunc) error {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
if len(points) == 0 {
return nil, errEmpty
return errEmpty
}
switch t := points[0].Value.(type) {
case int64:
return newExactAggregator(points)
return recordGaugePoints(points, recorder)
case float64:
return newExactAggregator(points)
return recordGaugePoints(points, recorder)
case *metricdata.Distribution:
return newDistributionAggregator(points)
return recordDistributionPoint(points, recorder)
default:
// TODO add *metricdata.Summary support
return nil, fmt.Errorf("%w: %v", errIncompatibleType, t)
return fmt.Errorf("%w: %v", errIncompatibleType, t)
}
}

var _ aggregation.Aggregation = &ocExactAggregator{}
var _ aggregation.LastValue = &ocExactAggregator{}
var _ aggregation.Points = &ocExactAggregator{}
var _ aggregation.Aggregation = &ocRawAggregator{}
var _ aggregation.LastValue = &ocRawAggregator{}

// newExactAggregator creates an OpenTelemetry aggreation from OpenCensus points.
// recordGaugePoints creates an OpenTelemetry aggregation from OpenCensus points.
// Points may not be empty, and must only contain integers or floats.
func newExactAggregator(pts []metricdata.Point) (aggregationWithEndTime, error) {
points := make([]aggregation.Point, len(pts))
for i, pt := range pts {
func recordGaugePoints(pts []metricdata.Point, recorder recordFunc) error {
for _, pt := range pts {
switch t := pt.Value.(type) {
case int64:
points[i] = aggregation.Point{
Number: number.NewInt64Number(pt.Value.(int64)),
Time: pt.Time,
if err := recorder(&ocRawAggregator{
value: number.NewInt64Number(pt.Value.(int64)),
time: pt.Time,
}, pt.Time); err != nil {
return err
}
case float64:
points[i] = aggregation.Point{
Number: number.NewFloat64Number(pt.Value.(float64)),
Time: pt.Time,
if err := recorder(&ocRawAggregator{
value: number.NewFloat64Number(pt.Value.(float64)),
time: pt.Time,
}, pt.Time); err != nil {
return err
}
default:
return nil, fmt.Errorf("%w: %v", errIncompatibleType, t)
return fmt.Errorf("%w: %v", errIncompatibleType, t)
}
}
return &ocExactAggregator{
points: points,
}, nil
return nil
}

type ocExactAggregator struct {
points []aggregation.Point
type ocRawAggregator struct {
value number.Number
time time.Time
}

// Kind returns the kind of aggregation this is.
func (o *ocExactAggregator) Kind() aggregation.Kind {
return aggregation.ExactKind
}

// Points returns access to the raw data set.
func (o *ocExactAggregator) Points() ([]aggregation.Point, error) {
return o.points, nil
func (o *ocRawAggregator) Kind() aggregation.Kind {
return aggregation.LastValueKind
}

// LastValue returns the last point.
func (o *ocExactAggregator) LastValue() (number.Number, time.Time, error) {
last := o.points[len(o.points)-1]
return last.Number, last.Time, nil
}

// end returns the timestamp of the last point
func (o *ocExactAggregator) end() time.Time {
_, t, _ := o.LastValue()
return t
func (o *ocRawAggregator) LastValue() (number.Number, time.Time, error) {
return o.value, o.time, nil
}

var _ aggregation.Aggregation = &ocDistAggregator{}
var _ aggregation.Histogram = &ocDistAggregator{}

// newDistributionAggregator creates an OpenTelemetry aggreation from
// recordDistributionPoint creates an OpenTelemetry aggregation from
// OpenCensus points. Points may not be empty, and must only contain
// Distributions. The most recent disribution will be used in the aggregation.
func newDistributionAggregator(pts []metricdata.Point) (aggregationWithEndTime, error) {
func recordDistributionPoint(pts []metricdata.Point, recorder recordFunc) error {
// only use the most recent datapoint for now.
pt := pts[len(pts)-1]
val, ok := pt.Value.(*metricdata.Distribution)
if !ok {
return nil, fmt.Errorf("%w: %v", errBadPoint, pt.Value)
return fmt.Errorf("%w: %v", errBadPoint, pt.Value)
}
bucketCounts := make([]uint64, len(val.Buckets))
for i, bucket := range val.Buckets {
if bucket.Count < 0 {
return nil, fmt.Errorf("%w: bucket count may not be negative", errBadPoint)
return fmt.Errorf("%w: bucket count may not be negative", errBadPoint)
}
bucketCounts[i] = uint64(bucket.Count)
}
if val.Count < 0 {
return nil, fmt.Errorf("%w: count may not be negative", errBadPoint)
return fmt.Errorf("%w: count may not be negative", errBadPoint)
}
return &ocDistAggregator{
return recorder(&ocDistAggregator{
sum: number.NewFloat64Number(val.Sum),
count: uint64(val.Count),
buckets: aggregation.Buckets{
Boundaries: val.BucketOptions.Bounds,
Counts: bucketCounts,
},
endTime: pts[len(pts)-1].Time,
}, nil
}, pts[len(pts)-1].Time)
}

type ocDistAggregator struct {
sum number.Number
count uint64
buckets aggregation.Buckets
endTime time.Time
}

// Kind returns the kind of aggregation this is.
Expand All @@ -173,8 +157,3 @@ func (o *ocDistAggregator) Count() (uint64, error) {
func (o *ocDistAggregator) Histogram() (aggregation.Buckets, error) {
return o.buckets, nil
}

// end returns the time the histogram was measured.
func (o *ocDistAggregator) end() time.Time {
return o.endTime
}