Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate delta sums for delta async counter/up-down-counter types #3398

Merged
merged 3 commits into from Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- The `go.opentelemetry.io/otel/exporters/prometheus` exporter fixes duplicated `_total` suffixes. (#3369)
- Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398)

## [1.11.1/0.33.0] 2022-10-19

Expand Down
8 changes: 4 additions & 4 deletions metric/instrument/asyncfloat64/asyncfloat64.go
Expand Up @@ -35,8 +35,8 @@ type InstrumentProvider interface {

// Counter is an instrument that records increasing values.
type Counter interface {
// Observe records the state of the instrument to be x. The value of x is
// assumed to be the exact Counter value to record.
// Observe records the state of the instrument to be x. Implementations
// will assume x to be the cumulative sum of the count.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand All @@ -48,8 +48,8 @@ type Counter interface {

// UpDownCounter is an instrument that records increasing or decreasing values.
type UpDownCounter interface {
// Observe records the state of the instrument to be x. The value of x is
// assumed to be the exact UpDownCounter value to record.
// Observe records the state of the instrument to be x. Implementations
// will assume x to be the cumulative sum of the count.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand Down
8 changes: 4 additions & 4 deletions metric/instrument/asyncint64/asyncint64.go
Expand Up @@ -35,8 +35,8 @@ type InstrumentProvider interface {

// Counter is an instrument that records increasing values.
type Counter interface {
// Observe records the state of the instrument to be x. The value of x is
// assumed to be the exact Counter value to record.
// Observe records the state of the instrument to be x. Implementations
// will assume x to be the cumulative sum of the count.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand All @@ -48,8 +48,8 @@ type Counter interface {

// UpDownCounter is an instrument that records increasing or decreasing values.
type UpDownCounter interface {
// Observe records the state of the instrument to be x. The value of x is
// assumed to be the exact UpDownCounter value to record.
// Observe records the state of the instrument to be x. Implementations
// will assume x to be the cumulative sum of the count.
//
// It is only valid to call this within a callback. If called outside of the
// registered callback it should have no effect on the instrument, and an
Expand Down
74 changes: 64 additions & 10 deletions sdk/metric/internal/sum.go
Expand Up @@ -177,7 +177,66 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
// The output Aggregation will report recorded values as delta temporality. It
// is up to the caller to ensure this is accurate.
func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedSum[N]{settableSum: newDeltaSum[N](monotonic)}
return &precomputedDeltaSum[N]{
recorded: make(map[attribute.Set]N),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
}
}

// precomputedDeltaSum summarizes a set of measurements recorded over all
// aggregation cycles as the delta arithmetic sum.
type precomputedDeltaSum[N int64 | float64] struct {
sync.Mutex
recorded map[attribute.Set]N
reported map[attribute.Set]N

monotonic bool
start time.Time
}

// Aggregate records value as a cumulative sum for attr.
func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
s.recorded[attr] = value
s.Unlock()
}

func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
out := metricdata.Sum[N]{
Temporality: metricdata.DeltaTemporality,
IsMonotonic: s.monotonic,
}

s.Lock()
defer s.Unlock()

if len(s.recorded) == 0 {
return out
}

t := now()
out.DataPoints = make([]metricdata.DataPoint[N], 0, len(s.recorded))
for attr, recorded := range s.recorded {
value := recorded - s.reported[attr]
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value,
})
if value != 0 {
s.reported[attr] = recorded
}
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
// The delta collection cycle resets.
s.start = t
return out
}

// NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of
Expand All @@ -191,21 +250,16 @@ func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
// The output Aggregation will report recorded values as cumulative
// temporality. It is up to the caller to ensure this is accurate.
func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedSum[N]{settableSum: newCumulativeSum[N](monotonic)}
}

type settableSum[N int64 | float64] interface {
set(value N, attr attribute.Set)
Aggregation() metricdata.Aggregation
return &precomputedSum[N]{newCumulativeSum[N](monotonic)}
}

// precomputedSum summarizes a set of measurements recorded over all
// aggregation cycles directly as an arithmetic sum.
// aggregation cycles directly as the cumulative arithmetic sum.
type precomputedSum[N int64 | float64] struct {
settableSum[N]
*cumulativeSum[N]
}

// Aggregate records value directly as a sum for attr.
// Aggregate records value as a cumulative sum for attr.
func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) {
s.set(value, attr)
}
30 changes: 22 additions & 8 deletions sdk/metric/internal/sum_test.go
Expand Up @@ -56,22 +56,22 @@ func testSum[N int64 | float64](t *testing.T) {
})

t.Run("PreComputedDelta", func(t *testing.T) {
incr, mono, temp := monoIncr, true, metricdata.DeltaTemporality
eFunc := preExpecter[N](incr, mono, temp)
incr, mono := monoIncr, true
eFunc := preDeltaExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
eFunc = preExpecter[N](incr, mono, temp)
eFunc = preDeltaExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc))
})

t.Run("PreComputedCumulative", func(t *testing.T) {
incr, mono, temp := monoIncr, true, metricdata.CumulativeTemporality
eFunc := preExpecter[N](incr, mono, temp)
incr, mono := monoIncr, true
eFunc := preCumuExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
eFunc = preExpecter[N](incr, mono, temp)
eFunc = preCumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc))
})
}
Expand Down Expand Up @@ -100,8 +100,22 @@ func cumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
}
}

func preExpecter[N int64 | float64](incr setMap, mono bool, temp metricdata.Temporality) expectFunc {
sum := metricdata.Sum[N]{Temporality: temp, IsMonotonic: mono}
func preDeltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
last := make(map[attribute.Set]N)
return func(int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
l := last[a]
sum.DataPoints = append(sum.DataPoints, point(a, N(v)-l))
last[a] = N(v)
}
return sum
}
}

func preCumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono}
return func(int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
Expand Down