Skip to content

Commit

Permalink
Metric SDK: Remove distiction between filtered and unfiltered
Browse files Browse the repository at this point in the history
attributes and drop non-observed attribute sets.
  • Loading branch information
dashpole committed Jul 7, 2023
1 parent c404a30 commit fdae38e
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 324 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Count the Collect time in the PeriodicReader timeout. (#4221)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4289)
- If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289)

### Fixed

Expand Down
19 changes: 0 additions & 19 deletions sdk/metric/internal/aggregate/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,3 @@ type Aggregator[N int64 | float64] interface {
// measurements made and ends an aggregation cycle.
Aggregation() metricdata.Aggregation
}

// precomputeAggregator is an Aggregator that receives values to aggregate that
// have been pre-computed by the caller.
type precomputeAggregator[N int64 | float64] interface {
// The Aggregate method of the embedded Aggregator is used to record
// pre-computed measurements, scoped by attributes that have not been
// filtered by an attribute filter.
Aggregator[N]

// aggregateFiltered records measurements scoped by attributes that have
// been filtered by an attribute filter.
//
// Pre-computed measurements of filtered attributes need to be recorded
// separate from those that haven't been filtered so they can be added to
// the non-filtered pre-computed measurements in a collection cycle and
// then resets after the cycle (the non-filtered pre-computed measurements
// are not reset).
aggregateFiltered(N, attribute.Set)
}
43 changes: 0 additions & 43 deletions sdk/metric/internal/aggregate/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg
if fn == nil {
return agg
}
if fa, ok := agg.(precomputeAggregator[N]); ok {
return newPrecomputedFilter(fa, fn)
}
return newFilter(agg, fn)
}

Expand Down Expand Up @@ -68,43 +65,3 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}

// precomputedFilter is an aggregator that applies attribute filter when
// Aggregating for pre-computed Aggregations. The pre-computed Aggregations
// need to operate normally when no attribute filtering is done (for sums this
// means setting the value), but when attribute filtering is done it needs to
// be added to any set value.
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator precomputeAggregator[N]
}

// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
// with the attribute filter fn.
//
// This should not be used to wrap a non-pre-computed Aggregator. Use a
// precomputedFilter instead.
func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] {
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
fAttr, _ := attr.Filter(f.filter)
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
} else {
f.aggregator.aggregateFiltered(measurement, fAttr)
}
}

// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}
89 changes: 0 additions & 89 deletions sdk/metric/internal/aggregate/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"fmt"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -196,90 +194,3 @@ func TestFilterConcurrent(t *testing.T) {
testFilterConcurrent[float64](t)
})
}

func TestPrecomputedFilter(t *testing.T) {
t.Run("Int64", testPrecomputedFilter[int64]())
t.Run("Float64", testPrecomputedFilter[float64]())
}

func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
agg := newTestFilterAgg[N]()
f := NewFilter[N](agg, testAttributeFilter)
require.IsType(t, &precomputedFilter[N]{}, f)

var (
powerLevel = attribute.Int("power-level", 9000)
user = attribute.String("user", "Alice")
admin = attribute.Bool("admin", true)
)
a := attribute.NewSet(powerLevel)
key := a
f.Aggregate(1, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(0), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel, user)
f.Aggregate(2, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(2), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel, user, admin)
f.Aggregate(3, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))

a = attribute.NewSet(powerLevel)
f.Aggregate(2, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))

a = attribute.NewSet(user)
f.Aggregate(3, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a))

_ = f.Aggregation()
assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation")
}
}

func str(a attribute.Set) string {
iter := a.Iter()
out := make([]string, 0, iter.Len())
for iter.Next() {
kv := iter.Attribute()
out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface()))
}
return strings.Join(out, ",")
}

type testFilterAgg[N int64 | float64] struct {
values map[attribute.Set]precomputedValue[N]
aggregationN int
}

func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] {
return &testFilterAgg[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}

func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) {
v := a.values[attr]
v.measured = val
a.values[attr] = v
}

// nolint: unused // Used to agg filtered.
func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) {
v := a.values[attr]
v.filtered += val
a.values[attr] = v
}

func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation {
a.aggregationN++
return nil
}
104 changes: 20 additions & 84 deletions sdk/metric/internal/aggregate/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,63 +158,6 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
return out
}

// precomputedValue is the recorded measurement value for a set of attributes.
type precomputedValue[N int64 | float64] struct {
// measured is the last value measured for a set of attributes that were
// not filtered.
measured N
// filtered is the sum of values from measurements that had their
// attributes filtered.
filtered N
}

// precomputedMap is the storage for precomputed sums.
type precomputedMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]precomputedValue[N]
}

func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] {
return &precomputedMap[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}

// Aggregate records value with the unfiltered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value overwrite
// that value.
// - If that measurement's attributes were filtered, this value will be
// recorded along side that value.
func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
v := s.values[attr]
v.measured = value
s.values[attr] = v
s.Unlock()
}

// aggregateFiltered records value with the filtered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value will be
// recorded along side that value.
// - If that measurement's attributes were filtered, this value will be
// added to it.
//
// This method should not be used if attr have not been reduced by an attribute
// filter.
func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered.
s.Lock()
v := s.values[attr]
v.filtered += value
s.values[attr] = v
s.Unlock()
}

// NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of
// pre-computed sums. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in.
Expand All @@ -226,17 +169,17 @@ func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { //
// The output Aggregation will report recorded values as delta temporality.
func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedDeltaSum[N]{
precomputedMap: newPrecomputedMap[N](),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
valueMap: newValueMap[N](),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
}
}

// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all
// aggregation cycles as the delta of these sums.
type precomputedDeltaSum[N int64 | float64] struct {
*precomputedMap[N]
*valueMap[N]

reported map[attribute.Set]N

Expand All @@ -255,10 +198,12 @@ type precomputedDeltaSum[N int64 | float64] struct {
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
newReported := make(map[attribute.Set]N)
s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
s.reported = newReported
return nil
}

Expand All @@ -269,24 +214,19 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, value := range s.values {
v := value.measured + value.filtered
delta := v - s.reported[attr]
delta := value - s.reported[attr]
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: delta,
})
if delta != 0 {
s.reported[attr] = v
}
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
newReported[attr] = value
// Unused attribute sets do not report.
delete(s.values, attr)
}
// Unused attribute sets are forgotten.
s.reported = newReported
// The delta collection cycle resets.
s.start = t
return out
Expand All @@ -304,15 +244,15 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
// temporality.
func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedCumulativeSum[N]{
precomputedMap: newPrecomputedMap[N](),
monotonic: monotonic,
start: now(),
valueMap: newValueMap[N](),
monotonic: monotonic,
start: now(),
}
}

// precomputedCumulativeSum directly records and reports a set of pre-computed sums.
type precomputedCumulativeSum[N int64 | float64] struct {
*precomputedMap[N]
*valueMap[N]

monotonic bool
start time.Time
Expand Down Expand Up @@ -347,14 +287,10 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value.measured + value.filtered,
Value: value,
})
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
// Unused attribute sets do not report.
delete(s.values, attr)
}
return out
}

0 comments on commit fdae38e

Please sign in to comment.