diff --git a/CHANGELOG.md b/CHANGELOG.md index f8f29e5c979..1a47d2b6451 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed - The `go.opentelemetry.io/otel/exporters/prometheus` exporter fixes duplicated `_total` suffixes. (#3369) +- Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398) ## [1.11.1/0.33.0] 2022-10-19 diff --git a/metric/instrument/asyncfloat64/asyncfloat64.go b/metric/instrument/asyncfloat64/asyncfloat64.go index 5c8260ceb6f..17bda2ceee1 100644 --- a/metric/instrument/asyncfloat64/asyncfloat64.go +++ b/metric/instrument/asyncfloat64/asyncfloat64.go @@ -35,8 +35,8 @@ type InstrumentProvider interface { // Counter is an instrument that records increasing values. type Counter interface { - // Observe records the state of the instrument to be x. The value of x is - // assumed to be the exact Counter value to record. + // Observe records the state of the instrument to be x. Implementations + // will assume x to be the cumulative sum of the count. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an @@ -48,8 +48,8 @@ type Counter interface { // UpDownCounter is an instrument that records increasing or decreasing values. type UpDownCounter interface { - // Observe records the state of the instrument to be x. The value of x is - // assumed to be the exact UpDownCounter value to record. + // Observe records the state of the instrument to be x. Implementations + // will assume x to be the cumulative sum of the count. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an diff --git a/metric/instrument/asyncint64/asyncint64.go b/metric/instrument/asyncint64/asyncint64.go index b07409c7931..1e17988e827 100644 --- a/metric/instrument/asyncint64/asyncint64.go +++ b/metric/instrument/asyncint64/asyncint64.go @@ -35,8 +35,8 @@ type InstrumentProvider interface { // Counter is an instrument that records increasing values. type Counter interface { - // Observe records the state of the instrument to be x. The value of x is - // assumed to be the exact Counter value to record. + // Observe records the state of the instrument to be x. Implementations + // will assume x to be the cumulative sum of the count. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an @@ -48,8 +48,8 @@ type Counter interface { // UpDownCounter is an instrument that records increasing or decreasing values. type UpDownCounter interface { - // Observe records the state of the instrument to be x. The value of x is - // assumed to be the exact UpDownCounter value to record. + // Observe records the state of the instrument to be x. Implementations + // will assume x to be the cumulative sum of the count. // // It is only valid to call this within a callback. If called outside of the // registered callback it should have no effect on the instrument, and an diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 61b8d20860e..210d2370120 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -177,7 +177,66 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { // The output Aggregation will report recorded values as delta temporality. It // is up to the caller to ensure this is accurate. func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { - return &precomputedSum[N]{settableSum: newDeltaSum[N](monotonic)} + return &precomputedDeltaSum[N]{ + recorded: make(map[attribute.Set]N), + reported: make(map[attribute.Set]N), + monotonic: monotonic, + start: now(), + } +} + +// precomputedDeltaSum summarizes a set of measurements recorded over all +// aggregation cycles as the delta arithmetic sum. +type precomputedDeltaSum[N int64 | float64] struct { + sync.Mutex + recorded map[attribute.Set]N + reported map[attribute.Set]N + + monotonic bool + start time.Time +} + +// Aggregate records value as a cumulative sum for attr. +func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) { + s.Lock() + s.recorded[attr] = value + s.Unlock() +} + +func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { + out := metricdata.Sum[N]{ + Temporality: metricdata.DeltaTemporality, + IsMonotonic: s.monotonic, + } + + s.Lock() + defer s.Unlock() + + if len(s.recorded) == 0 { + return out + } + + t := now() + out.DataPoints = make([]metricdata.DataPoint[N], 0, len(s.recorded)) + for attr, recorded := range s.recorded { + value := recorded - s.reported[attr] + out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ + Attributes: attr, + StartTime: s.start, + Time: t, + Value: value, + }) + if value != 0 { + s.reported[attr] = recorded + } + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + } + // The delta collection cycle resets. + s.start = t + return out } // NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of @@ -191,21 +250,16 @@ func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { // The output Aggregation will report recorded values as cumulative // temporality. It is up to the caller to ensure this is accurate. func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { - return &precomputedSum[N]{settableSum: newCumulativeSum[N](monotonic)} -} - -type settableSum[N int64 | float64] interface { - set(value N, attr attribute.Set) - Aggregation() metricdata.Aggregation + return &precomputedSum[N]{newCumulativeSum[N](monotonic)} } // precomputedSum summarizes a set of measurements recorded over all -// aggregation cycles directly as an arithmetic sum. +// aggregation cycles directly as the cumulative arithmetic sum. type precomputedSum[N int64 | float64] struct { - settableSum[N] + *cumulativeSum[N] } -// Aggregate records value directly as a sum for attr. +// Aggregate records value as a cumulative sum for attr. func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) { s.set(value, attr) } diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go index eda320070ff..92e3b394675 100644 --- a/sdk/metric/internal/sum_test.go +++ b/sdk/metric/internal/sum_test.go @@ -56,22 +56,22 @@ func testSum[N int64 | float64](t *testing.T) { }) t.Run("PreComputedDelta", func(t *testing.T) { - incr, mono, temp := monoIncr, true, metricdata.DeltaTemporality - eFunc := preExpecter[N](incr, mono, temp) + incr, mono := monoIncr, true + eFunc := preDeltaExpecter[N](incr, mono) t.Run("Monotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc)) incr, mono = nonMonoIncr, false - eFunc = preExpecter[N](incr, mono, temp) + eFunc = preDeltaExpecter[N](incr, mono) t.Run("NonMonotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc)) }) t.Run("PreComputedCumulative", func(t *testing.T) { - incr, mono, temp := monoIncr, true, metricdata.CumulativeTemporality - eFunc := preExpecter[N](incr, mono, temp) + incr, mono := monoIncr, true + eFunc := preCumuExpecter[N](incr, mono) t.Run("Monotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc)) incr, mono = nonMonoIncr, false - eFunc = preExpecter[N](incr, mono, temp) + eFunc = preCumuExpecter[N](incr, mono) t.Run("NonMonotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc)) }) } @@ -100,8 +100,22 @@ func cumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc { } } -func preExpecter[N int64 | float64](incr setMap, mono bool, temp metricdata.Temporality) expectFunc { - sum := metricdata.Sum[N]{Temporality: temp, IsMonotonic: mono} +func preDeltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc { + sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono} + last := make(map[attribute.Set]N) + return func(int) metricdata.Aggregation { + sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) + for a, v := range incr { + l := last[a] + sum.DataPoints = append(sum.DataPoints, point(a, N(v)-l)) + last[a] = N(v) + } + return sum + } +} + +func preCumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc { + sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono} return func(int) metricdata.Aggregation { sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) for a, v := range incr {