diff --git a/CHANGELOG.md b/CHANGELOG.md index c18496aaf6c..8c8566a8fbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -107,6 +107,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed +- Asynchronous instruments that use sum aggregators and attribute filters correctly add values from equivalent attribute sets that have been filtered. (#3439, #3549) - The `RegisterCallback` method of the `Meter` from `go.opentelemetry.io/otel/sdk/metric` only registers a callback for instruments created by that meter. Trying to register a callback with instruments from a different meter will result in an error being returned. (#3584) diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 952e9a4a8bd..42694d87020 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -22,13 +22,13 @@ import ( ) // now is used to return the current local time while allowing tests to -// override the the default time.Now function. +// override the default time.Now function. var now = time.Now // Aggregator forms an aggregation from a collection of recorded measurements. // -// Aggregators need to be comparable so they can be de-duplicated by the SDK when -// it creates them for multiple views. +// Aggregators need to be comparable so they can be de-duplicated by the SDK +// when it creates them for multiple views. type Aggregator[N int64 | float64] interface { // Aggregate records the measurement, scoped by attr, and aggregates it // into an aggregation. @@ -38,3 +38,22 @@ type Aggregator[N int64 | float64] interface { // measurements made and ends an aggregation cycle. Aggregation() metricdata.Aggregation } + +// precomputeAggregator is an Aggregator that receives values to aggregate that +// have been pre-computed by the caller. +type precomputeAggregator[N int64 | float64] interface { + // The Aggregate method of the embedded Aggregator is used to record + // pre-computed measurements, scoped by attributes that have not been + // filtered by an attribute filter. + Aggregator[N] + + // aggregateFiltered records measurements scoped by attributes that have + // been filtered by an attribute filter. + // + // Pre-computed measurements of filtered attributes need to be recorded + // separate from those that haven't been filtered so they can be added to + // the non-filtered pre-computed measurements in a collection cycle and + // then resets after the cycle (the non-filtered pre-computed measurements + // are not reset). + aggregateFiltered(N, attribute.Set) +} diff --git a/sdk/metric/internal/filter.go b/sdk/metric/internal/filter.go index 86e73c866dc..4d24b62819a 100644 --- a/sdk/metric/internal/filter.go +++ b/sdk/metric/internal/filter.go @@ -21,8 +21,26 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// filter is an aggregator that applies attribute filter when Aggregating. filters -// do not have any backing memory, and must be constructed with a backing Aggregator. +// NewFilter returns an Aggregator that wraps an agg with an attribute +// filtering function. Both pre-computed non-pre-computed Aggregators can be +// passed for agg. An appropriate Aggregator will be returned for the detected +// type. +func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] { + if fn == nil { + return agg + } + if fa, ok := agg.(precomputeAggregator[N]); ok { + return newPrecomputedFilter(fa, fn) + } + return newFilter(agg, fn) +} + +// filter wraps an aggregator with an attribute filter. All recorded +// measurements will have their attributes filtered before they are passed to +// the underlying aggregator's Aggregate method. +// +// This should not be used to wrap a pre-computed Aggregator. Use a +// precomputedFilter instead. type filter[N int64 | float64] struct { filter attribute.Filter aggregator Aggregator[N] @@ -31,15 +49,16 @@ type filter[N int64 | float64] struct { seen map[attribute.Set]attribute.Set } -// NewFilter wraps an Aggregator with an attribute filtering function. -func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] { - if fn == nil { - return agg - } +// newFilter returns an filter Aggregator that wraps agg with the attribute +// filter fn. +// +// This should not be used to wrap a pre-computed Aggregator. Use a +// precomputedFilter instead. +func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] { return &filter[N]{ filter: fn, aggregator: agg, - seen: map[attribute.Set]attribute.Set{}, + seen: make(map[attribute.Set]attribute.Set), } } @@ -62,3 +81,54 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { func (f *filter[N]) Aggregation() metricdata.Aggregation { return f.aggregator.Aggregation() } + +// precomputedFilter is an aggregator that applies attribute filter when +// Aggregating for pre-computed Aggregations. The pre-computed Aggregations +// need to operate normally when no attribute filtering is done (for sums this +// means setting the value), but when attribute filtering is done it needs to +// be added to any set value. +type precomputedFilter[N int64 | float64] struct { + filter attribute.Filter + aggregator precomputeAggregator[N] + + sync.Mutex + seen map[attribute.Set]attribute.Set +} + +// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg +// with the attribute filter fn. +// +// This should not be used to wrap a non-pre-computed Aggregator. Use a +// precomputedFilter instead. +func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] { + return &precomputedFilter[N]{ + filter: fn, + aggregator: agg, + seen: make(map[attribute.Set]attribute.Set), + } +} + +// Aggregate records the measurement, scoped by attr, and aggregates it +// into an aggregation. +func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) { + // TODO (#3006): drop stale attributes from seen. + f.Lock() + defer f.Unlock() + fAttr, ok := f.seen[attr] + if !ok { + fAttr, _ = attr.Filter(f.filter) + f.seen[attr] = fAttr + } + if fAttr.Equals(&attr) { + // No filtering done. + f.aggregator.Aggregate(measurement, fAttr) + } else { + f.aggregator.aggregateFiltered(measurement, fAttr) + } +} + +// Aggregation returns an Aggregation, for all the aggregated +// measurements made and ends an aggregation cycle. +func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation { + return f.aggregator.Aggregation() +} diff --git a/sdk/metric/internal/filter_test.go b/sdk/metric/internal/filter_test.go index e5632156b30..e1333c1d4d1 100644 --- a/sdk/metric/internal/filter_test.go +++ b/sdk/metric/internal/filter_test.go @@ -15,6 +15,8 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( + "fmt" + "strings" "sync" "testing" @@ -194,3 +196,90 @@ func TestFilterConcurrent(t *testing.T) { testFilterConcurrent[float64](t) }) } + +func TestPrecomputedFilter(t *testing.T) { + t.Run("Int64", testPrecomputedFilter[int64]()) + t.Run("Float64", testPrecomputedFilter[float64]()) +} + +func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) { + return func(t *testing.T) { + agg := newTestFilterAgg[N]() + f := NewFilter[N](agg, testAttributeFilter) + require.IsType(t, &precomputedFilter[N]{}, f) + + var ( + powerLevel = attribute.Int("power-level", 9000) + user = attribute.String("user", "Alice") + admin = attribute.Bool("admin", true) + ) + a := attribute.NewSet(powerLevel) + key := a + f.Aggregate(1, a) + assert.Equal(t, N(1), agg.values[key].measured, str(a)) + assert.Equal(t, N(0), agg.values[key].filtered, str(a)) + + a = attribute.NewSet(powerLevel, user) + f.Aggregate(2, a) + assert.Equal(t, N(1), agg.values[key].measured, str(a)) + assert.Equal(t, N(2), agg.values[key].filtered, str(a)) + + a = attribute.NewSet(powerLevel, user, admin) + f.Aggregate(3, a) + assert.Equal(t, N(1), agg.values[key].measured, str(a)) + assert.Equal(t, N(5), agg.values[key].filtered, str(a)) + + a = attribute.NewSet(powerLevel) + f.Aggregate(2, a) + assert.Equal(t, N(2), agg.values[key].measured, str(a)) + assert.Equal(t, N(5), agg.values[key].filtered, str(a)) + + a = attribute.NewSet(user) + f.Aggregate(3, a) + assert.Equal(t, N(2), agg.values[key].measured, str(a)) + assert.Equal(t, N(5), agg.values[key].filtered, str(a)) + assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a)) + + _ = f.Aggregation() + assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation") + } +} + +func str(a attribute.Set) string { + iter := a.Iter() + out := make([]string, 0, iter.Len()) + for iter.Next() { + kv := iter.Attribute() + out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface())) + } + return strings.Join(out, ",") +} + +type testFilterAgg[N int64 | float64] struct { + values map[attribute.Set]precomputedValue[N] + aggregationN int +} + +func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] { + return &testFilterAgg[N]{ + values: make(map[attribute.Set]precomputedValue[N]), + } +} + +func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) { + v := a.values[attr] + v.measured = val + a.values[attr] = v +} + +// nolint: unused // Used to agg filtered. +func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) { + v := a.values[attr] + v.filtered += val + a.values[attr] = v +} + +func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation { + a.aggregationN++ + return nil +} diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index ecf22f44b6d..7783dc66490 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -22,7 +22,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// valueMap is the storage for all sums. +// valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex values map[attribute.Set]N @@ -32,12 +32,6 @@ func newValueMap[N int64 | float64]() *valueMap[N] { return &valueMap[N]{values: make(map[attribute.Set]N)} } -func (s *valueMap[N]) set(value N, attr attribute.Set) { // nolint: unused // This is indeed used. - s.Lock() - s.values[attr] = value - s.Unlock() -} - func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) { s.Lock() s.values[attr] += value @@ -164,48 +158,107 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { return out } +// precomputedValue is the recorded measurement value for a set of attributes. +type precomputedValue[N int64 | float64] struct { + // measured is the last value measured for a set of attributes that were + // not filtered. + measured N + // filtered is the sum of values from measurements that had their + // attributes filtered. + filtered N +} + +// precomputedMap is the storage for precomputed sums. +type precomputedMap[N int64 | float64] struct { + sync.Mutex + values map[attribute.Set]precomputedValue[N] +} + +func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] { + return &precomputedMap[N]{ + values: make(map[attribute.Set]precomputedValue[N]), + } +} + +// Aggregate records value with the unfiltered attributes attr. +// +// If a previous measurement was made for the same attribute set: +// +// - If that measurement's attributes were not filtered, this value overwrite +// that value. +// - If that measurement's attributes were filtered, this value will be +// recorded along side that value. +func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) { + s.Lock() + v := s.values[attr] + v.measured = value + s.values[attr] = v + s.Unlock() +} + +// aggregateFiltered records value with the filtered attributes attr. +// +// If a previous measurement was made for the same attribute set: +// +// - If that measurement's attributes were not filtered, this value will be +// recorded along side that value. +// - If that measurement's attributes were filtered, this value will be +// added to it. +// +// This method should not be used if attr have not been reduced by an attribute +// filter. +func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered. + s.Lock() + v := s.values[attr] + v.filtered += value + s.values[attr] = v + s.Unlock() +} + // NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of -// measurements as their pre-computed arithmetic sum. Each sum is scoped by -// attributes and the aggregation cycle the measurements were made in. +// pre-computed sums. Each sum is scoped by attributes and the aggregation +// cycle the measurements were made in. // // The monotonic value is used to communicate the produced Aggregation is // monotonic or not. The returned Aggregator does not make any guarantees this // value is accurate. It is up to the caller to ensure it. // -// The output Aggregation will report recorded values as delta temporality. It -// is up to the caller to ensure this is accurate. +// The output Aggregation will report recorded values as delta temporality. func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { return &precomputedDeltaSum[N]{ - recorded: make(map[attribute.Set]N), - reported: make(map[attribute.Set]N), - monotonic: monotonic, - start: now(), + precomputedMap: newPrecomputedMap[N](), + reported: make(map[attribute.Set]N), + monotonic: monotonic, + start: now(), } } -// precomputedDeltaSum summarizes a set of measurements recorded over all -// aggregation cycles as the delta arithmetic sum. +// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all +// aggregation cycles as the delta of these sums. type precomputedDeltaSum[N int64 | float64] struct { - sync.Mutex - recorded map[attribute.Set]N + *precomputedMap[N] + reported map[attribute.Set]N monotonic bool start time.Time } -// Aggregate records value as a cumulative sum for attr. -func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) { - s.Lock() - s.recorded[attr] = value - s.Unlock() -} - +// Aggregation returns the recorded pre-computed sums as an Aggregation. The +// sum values are expressed as the delta between what was measured this +// collection cycle and the previous. +// +// All pre-computed sums that were recorded for attributes sets reduced by an +// attribute filter (filtered-sums) are summed together and added to any +// pre-computed sum value recorded directly for the resulting attribute set +// (unfiltered-sum). The filtered-sums are reset to zero for the next +// collection cycle, and the unfiltered-sum is kept for the next collection +// cycle. func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() - if len(s.recorded) == 0 { + if len(s.values) == 0 { return nil } @@ -213,19 +266,22 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { out := metricdata.Sum[N]{ Temporality: metricdata.DeltaTemporality, IsMonotonic: s.monotonic, - DataPoints: make([]metricdata.DataPoint[N], 0, len(s.recorded)), + DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), } - for attr, recorded := range s.recorded { - value := recorded - s.reported[attr] + for attr, value := range s.values { + v := value.measured + value.filtered + delta := v - s.reported[attr] out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ Attributes: attr, StartTime: s.start, Time: t, - Value: value, + Value: delta, }) - if value != 0 { - s.reported[attr] = recorded + if delta != 0 { + s.reported[attr] = v } + value.filtered = N(0) + s.values[attr] = value // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -237,26 +293,68 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { } // NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of -// measurements as their pre-computed arithmetic sum. Each sum is scoped by -// attributes and the aggregation cycle the measurements were made in. +// pre-computed sums. Each sum is scoped by attributes and the aggregation +// cycle the measurements were made in. // // The monotonic value is used to communicate the produced Aggregation is // monotonic or not. The returned Aggregator does not make any guarantees this // value is accurate. It is up to the caller to ensure it. // // The output Aggregation will report recorded values as cumulative -// temporality. It is up to the caller to ensure this is accurate. +// temporality. func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { - return &precomputedSum[N]{newCumulativeSum[N](monotonic)} + return &precomputedCumulativeSum[N]{ + precomputedMap: newPrecomputedMap[N](), + monotonic: monotonic, + start: now(), + } } -// precomputedSum summarizes a set of measurements recorded over all -// aggregation cycles directly as the cumulative arithmetic sum. -type precomputedSum[N int64 | float64] struct { - *cumulativeSum[N] +// precomputedCumulativeSum directly records and reports a set of pre-computed sums. +type precomputedCumulativeSum[N int64 | float64] struct { + *precomputedMap[N] + + monotonic bool + start time.Time } -// Aggregate records value as a cumulative sum for attr. -func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) { - s.set(value, attr) +// Aggregation returns the recorded pre-computed sums as an Aggregation. The +// sum values are expressed directly as they are assumed to be recorded as the +// cumulative sum of a some measured phenomena. +// +// All pre-computed sums that were recorded for attributes sets reduced by an +// attribute filter (filtered-sums) are summed together and added to any +// pre-computed sum value recorded directly for the resulting attribute set +// (unfiltered-sum). The filtered-sums are reset to zero for the next +// collection cycle, and the unfiltered-sum is kept for the next collection +// cycle. +func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { + s.Lock() + defer s.Unlock() + + if len(s.values) == 0 { + return nil + } + + t := now() + out := metricdata.Sum[N]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: s.monotonic, + DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), + } + for attr, value := range s.values { + out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ + Attributes: attr, + StartTime: s.start, + Time: t, + Value: value.measured + value.filtered, + }) + value.filtered = N(0) + s.values[attr] = value + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + } + return out } diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go index 359b73aa50a..cde79aaa92b 100644 --- a/sdk/metric/internal/sum_test.go +++ b/sdk/metric/internal/sum_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -163,6 +164,134 @@ func TestDeltaSumReset(t *testing.T) { t.Run("Float64", testDeltaSumReset[float64]) } +func TestPreComputedDeltaSum(t *testing.T) { + var mono bool + agg := NewPrecomputedDeltaSum[int64](mono) + require.Implements(t, (*precomputeAggregator[int64])(nil), agg) + + attrs := attribute.NewSet(attribute.String("key", "val")) + agg.Aggregate(1, attrs) + got := agg.Aggregation() + want := metricdata.Sum[int64]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, + } + opt := metricdatatest.IgnoreTimestamp() + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Delta values should zero. + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) + got = agg.Aggregation() + // measured(+): 1, previous(-): 1, filtered(+): 1 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Filtered values should not persist. + got = agg.Aggregation() + // measured(+): 1, previous(-): 2, filtered(+): 0 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -1)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + got = agg.Aggregation() + // measured(+): 1, previous(-): 1, filtered(+): 0 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Override set value. + agg.Aggregate(2, attrs) + agg.Aggregate(5, attrs) + // Filtered should add. + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) + got = agg.Aggregation() + // measured(+): 5, previous(-): 1, filtered(+): 13 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Filtered values should not persist. + agg.Aggregate(5, attrs) + got = agg.Aggregation() + // measured(+): 5, previous(-): 18, filtered(+): 0 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Order should not affect measure. + // Filtered should add. + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) + agg.Aggregate(7, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) + got = agg.Aggregation() + // measured(+): 7, previous(-): 5, filtered(+): 13 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + agg.Aggregate(7, attrs) + got = agg.Aggregation() + // measured(+): 7, previous(-): 20, filtered(+): 0 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) +} + +func TestPreComputedCumulativeSum(t *testing.T) { + var mono bool + agg := NewPrecomputedCumulativeSum[int64](mono) + require.Implements(t, (*precomputeAggregator[int64])(nil), agg) + + attrs := attribute.NewSet(attribute.String("key", "val")) + agg.Aggregate(1, attrs) + got := agg.Aggregation() + want := metricdata.Sum[int64]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, + } + opt := metricdatatest.IgnoreTimestamp() + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Cumulative values should persist. + got = agg.Aggregation() + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Filtered values should not persist. + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Override set value. + agg.Aggregate(5, attrs) + // Filtered should add. + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Filtered values should not persist. + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 5)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Order should not affect measure. + // Filtered should add. + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) + agg.Aggregate(7, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 7)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) +} + func TestEmptySumNilAggregation(t *testing.T) { assert.Nil(t, NewCumulativeSum[int64](true).Aggregation()) assert.Nil(t, NewCumulativeSum[int64](false).Aggregation()) diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 15c86d63156..190528e3c51 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -896,6 +896,11 @@ func TestRegisterCallbackDropAggregations(t *testing.T) { } func TestAttributeFilter(t *testing.T) { + t.Run("Delta", testAttributeFilter(metricdata.DeltaTemporality)) + t.Run("Cumulative", testAttributeFilter(metricdata.CumulativeTemporality)) +} + +func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { one := 1.0 two := 2.0 testcases := []struct { @@ -912,7 +917,8 @@ func TestAttributeFilter(t *testing.T) { } _, err = mtr.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveFloat64(ctr, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) - o.ObserveFloat64(ctr, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + o.ObserveFloat64(ctr, 2.0, attribute.String("foo", "bar")) + o.ObserveFloat64(ctr, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) return nil }, ctr) return err @@ -923,10 +929,10 @@ func TestAttributeFilter(t *testing.T) { DataPoints: []metricdata.DataPoint[float64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 2.0, // TODO (#3439): This should be 3.0. + Value: 4.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: true, }, }, @@ -940,7 +946,8 @@ func TestAttributeFilter(t *testing.T) { } _, err = mtr.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveFloat64(ctr, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) - o.ObserveFloat64(ctr, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + o.ObserveFloat64(ctr, 2.0, attribute.String("foo", "bar")) + o.ObserveFloat64(ctr, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) return nil }, ctr) return err @@ -951,10 +958,10 @@ func TestAttributeFilter(t *testing.T) { DataPoints: []metricdata.DataPoint[float64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 2.0, // TODO (#3439): This should be 3.0. + Value: 4.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: false, }, }, @@ -994,7 +1001,8 @@ func TestAttributeFilter(t *testing.T) { } _, err = mtr.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(ctr, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) - o.ObserveInt64(ctr, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + o.ObserveInt64(ctr, 20, attribute.String("foo", "bar")) + o.ObserveInt64(ctr, 10, attribute.String("foo", "bar"), attribute.Int("version", 2)) return nil }, ctr) return err @@ -1005,10 +1013,10 @@ func TestAttributeFilter(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 20, // TODO (#3439): This should be 30. + Value: 40, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: true, }, }, @@ -1022,7 +1030,8 @@ func TestAttributeFilter(t *testing.T) { } _, err = mtr.RegisterCallback(func(_ context.Context, o metric.Observer) error { o.ObserveInt64(ctr, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) - o.ObserveInt64(ctr, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + o.ObserveInt64(ctr, 20, attribute.String("foo", "bar")) + o.ObserveInt64(ctr, 10, attribute.String("foo", "bar"), attribute.Int("version", 2)) return nil }, ctr) return err @@ -1033,10 +1042,10 @@ func TestAttributeFilter(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 20, // TODO (#3439): This should be 30. + Value: 40, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: false, }, }, @@ -1088,7 +1097,7 @@ func TestAttributeFilter(t *testing.T) { Value: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: true, }, }, @@ -1114,7 +1123,7 @@ func TestAttributeFilter(t *testing.T) { Value: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: false, }, }, @@ -1145,7 +1154,7 @@ func TestAttributeFilter(t *testing.T) { Sum: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, }, }, }, @@ -1170,7 +1179,7 @@ func TestAttributeFilter(t *testing.T) { Value: 30, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: true, }, }, @@ -1196,7 +1205,7 @@ func TestAttributeFilter(t *testing.T) { Value: 30, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: false, }, }, @@ -1227,37 +1236,282 @@ func TestAttributeFilter(t *testing.T) { Sum: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, }, }, }, } - for _, tt := range testcases { - t.Run(tt.name, func(t *testing.T) { - rdr := NewManualReader() - mtr := NewMeterProvider( - WithReader(rdr), - WithView(NewView( - Instrument{Name: "*"}, - Stream{AttributeFilter: func(kv attribute.KeyValue) bool { - return kv.Key == attribute.Key("foo") - }}, - )), - ).Meter("TestAttributeFilter") - require.NoError(t, tt.register(t, mtr)) - - m, err := rdr.Collect(context.Background()) - assert.NoError(t, err) + return func(t *testing.T) { + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + rdr := NewManualReader(WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality { + return temporality + })) + mtr := NewMeterProvider( + WithReader(rdr), + WithView(NewView( + Instrument{Name: "*"}, + Stream{AttributeFilter: func(kv attribute.KeyValue) bool { + return kv.Key == attribute.Key("foo") + }}, + )), + ).Meter("TestAttributeFilter") + require.NoError(t, tt.register(t, mtr)) + + m, err := rdr.Collect(context.Background()) + assert.NoError(t, err) - require.Len(t, m.ScopeMetrics, 1) - require.Len(t, m.ScopeMetrics[0].Metrics, 1) + require.Len(t, m.ScopeMetrics, 1) + require.Len(t, m.ScopeMetrics[0].Metrics, 1) - metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp()) - }) + metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp()) + }) + } } } +func TestAsynchronousExample(t *testing.T) { + // This example can be found: + // https://github.com/open-telemetry/opentelemetry-specification/blob/8b91585e6175dd52b51e1d60bea105041225e35d/specification/metrics/supplementary-guidelines.md#asynchronous-example + var ( + threadID1 = attribute.Int("tid", 1) + threadID2 = attribute.Int("tid", 2) + threadID3 = attribute.Int("tid", 3) + + processID1001 = attribute.String("pid", "1001") + + thread1 = attribute.NewSet(processID1001, threadID1) + thread2 = attribute.NewSet(processID1001, threadID2) + thread3 = attribute.NewSet(processID1001, threadID3) + + process1001 = attribute.NewSet(processID1001) + ) + + setup := func(t *testing.T, temp metricdata.Temporality) (map[attribute.Set]int64, func(*testing.T), *metricdata.ScopeMetrics, *int64, *int64, *int64) { + t.Helper() + + const ( + instName = "pageFaults" + filteredStream = "filteredPageFaults" + scopeName = "AsynchronousExample" + ) + + selector := func(InstrumentKind) metricdata.Temporality { return temp } + reader := NewManualReader(WithTemporalitySelector(selector)) + + noopFilter := func(kv attribute.KeyValue) bool { return true } + noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: noopFilter}) + + filter := func(kv attribute.KeyValue) bool { return kv.Key != "tid" } + filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter}) + + mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered)) + meter := mp.Meter(scopeName) + + observations := make(map[attribute.Set]int64) + _, err := meter.Int64ObservableCounter(instName, instrument.WithInt64Callback( + func(ctx context.Context, o instrument.Int64Observer) error { + for attrSet, val := range observations { + o.Observe(ctx, val, attrSet.ToSlice()...) + } + return nil + }, + )) + require.NoError(t, err) + + want := &metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: scopeName}, + Metrics: []metricdata.Metrics{ + { + Name: filteredStream, + Data: metricdata.Sum[int64]{ + Temporality: temp, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: process1001}, + }, + }, + }, + { + Name: instName, + Data: metricdata.Sum[int64]{ + Temporality: temp, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: thread1}, + {Attributes: thread2}, + }, + }, + }, + }, + } + wantFiltered := &want.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread1 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread2 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value + + collect := func(t *testing.T) { + t.Helper() + got, err := reader.Collect(context.Background()) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + } + + return observations, collect, want, wantFiltered, wantThread1, wantThread2 + } + + t.Run("Cumulative", func(t *testing.T) { + temporality := metricdata.CumulativeTemporality + observations, verify, want, wantFiltered, wantThread1, wantThread2 := setup(t, temporality) + + // During the time range (T0, T1]: + // pid = 1001, tid = 1, #PF = 50 + // pid = 1001, tid = 2, #PF = 30 + observations[thread1] = 50 + observations[thread2] = 30 + + *wantFiltered = 80 + *wantThread1 = 50 + *wantThread2 = 30 + + verify(t) + + // During the time range (T1, T2]: + // pid = 1001, tid = 1, #PF = 53 + // pid = 1001, tid = 2, #PF = 38 + observations[thread1] = 53 + observations[thread2] = 38 + + *wantFiltered = 91 + *wantThread1 = 53 + *wantThread2 = 38 + + verify(t) + + // During the time range (T2, T3] + // pid = 1001, tid = 1, #PF = 56 + // pid = 1001, tid = 2, #PF = 42 + observations[thread1] = 56 + observations[thread2] = 42 + + *wantFiltered = 98 + *wantThread1 = 56 + *wantThread2 = 42 + + verify(t) + + // During the time range (T3, T4]: + // pid = 1001, tid = 1, #PF = 60 + // pid = 1001, tid = 2, #PF = 47 + observations[thread1] = 60 + observations[thread2] = 47 + + *wantFiltered = 107 + *wantThread1 = 60 + *wantThread2 = 47 + + verify(t) + + // During the time range (T4, T5]: + // thread 1 died, thread 3 started + // pid = 1001, tid = 2, #PF = 53 + // pid = 1001, tid = 3, #PF = 5 + delete(observations, thread1) + observations[thread2] = 53 + observations[thread3] = 5 + + *wantFiltered = 58 + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + // Thread 1 remains at last measured value. + {Attributes: thread1, Value: 60}, + {Attributes: thread2, Value: 53}, + {Attributes: thread3, Value: 5}, + }, + } + + verify(t) + }) + + t.Run("Delta", func(t *testing.T) { + temporality := metricdata.DeltaTemporality + observations, verify, want, wantFiltered, wantThread1, wantThread2 := setup(t, temporality) + + // During the time range (T0, T1]: + // pid = 1001, tid = 1, #PF = 50 + // pid = 1001, tid = 2, #PF = 30 + observations[thread1] = 50 + observations[thread2] = 30 + + *wantFiltered = 80 + *wantThread1 = 50 + *wantThread2 = 30 + + verify(t) + + // During the time range (T1, T2]: + // pid = 1001, tid = 1, #PF = 53 + // pid = 1001, tid = 2, #PF = 38 + observations[thread1] = 53 + observations[thread2] = 38 + + *wantFiltered = 11 + *wantThread1 = 3 + *wantThread2 = 8 + + verify(t) + + // During the time range (T2, T3] + // pid = 1001, tid = 1, #PF = 56 + // pid = 1001, tid = 2, #PF = 42 + observations[thread1] = 56 + observations[thread2] = 42 + + *wantFiltered = 7 + *wantThread1 = 3 + *wantThread2 = 4 + + verify(t) + + // During the time range (T3, T4]: + // pid = 1001, tid = 1, #PF = 60 + // pid = 1001, tid = 2, #PF = 47 + observations[thread1] = 60 + observations[thread2] = 47 + + *wantFiltered = 9 + *wantThread1 = 4 + *wantThread2 = 5 + + verify(t) + + // During the time range (T4, T5]: + // thread 1 died, thread 3 started + // pid = 1001, tid = 2, #PF = 53 + // pid = 1001, tid = 3, #PF = 5 + delete(observations, thread1) + observations[thread2] = 53 + observations[thread3] = 5 + + *wantFiltered = -49 + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + // Thread 1 remains at last measured value. + {Attributes: thread1, Value: 0}, + {Attributes: thread2, Value: 6}, + {Attributes: thread3, Value: 5}, + }, + } + + verify(t) + }) +} + var ( aiCounter instrument.Int64ObservableCounter aiUpDownCounter instrument.Int64ObservableUpDownCounter