From 948932aa6db5460b0ffc858a78fce5505b864768 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 19 Dec 2022 09:16:58 -0800 Subject: [PATCH 01/15] Combine spatially aggregated precomputed vals Fix #3439 When an attribute filter drops a distinguishing attribute during the aggregation of a precomputed sum add that value to existing, instead of just setting the value as an override (current behavior). --- sdk/metric/internal/filter.go | 31 ++++++++++++-- sdk/metric/internal/sum.go | 12 ++++++ sdk/metric/meter_test.go | 77 +++++++++++++++++++---------------- 3 files changed, 82 insertions(+), 38 deletions(-) diff --git a/sdk/metric/internal/filter.go b/sdk/metric/internal/filter.go index 86e73c866dc..ff835928f1c 100644 --- a/sdk/metric/internal/filter.go +++ b/sdk/metric/internal/filter.go @@ -21,14 +21,23 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +type filteredSet struct { + filtered bool + attrs attribute.Set +} + // filter is an aggregator that applies attribute filter when Aggregating. filters // do not have any backing memory, and must be constructed with a backing Aggregator. type filter[N int64 | float64] struct { filter attribute.Filter aggregator Aggregator[N] + // Used to aggreagte if an aggregator aggregates values differently for + // spatically reaggregated attributes. + filtered func(N, attribute.Set) + sync.Mutex - seen map[attribute.Set]attribute.Set + seen map[attribute.Set]filteredSet } // NewFilter wraps an Aggregator with an attribute filtering function. @@ -36,10 +45,19 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg if fn == nil { return agg } + af, ok := agg.(interface{ aggregateFiltered(N, attribute.Set) }) + if ok { + return &filter[N]{ + filter: fn, + aggregator: agg, + filtered: af.aggregateFiltered, + seen: make(map[attribute.Set]filteredSet), + } + } return &filter[N]{ filter: fn, aggregator: agg, - seen: map[attribute.Set]attribute.Set{}, + seen: make(map[attribute.Set]filteredSet), } } @@ -51,10 +69,15 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { defer f.Unlock() fAttr, ok := f.seen[attr] if !ok { - fAttr, _ = attr.Filter(f.filter) + a, na := attr.Filter(f.filter) + fAttr = filteredSet{filtered: len(na) != 0, attrs: a} f.seen[attr] = fAttr } - f.aggregator.Aggregate(measurement, fAttr) + if fAttr.filtered && f.filtered != nil { + f.filtered(measurement, fAttr.attrs) + } else { + f.aggregator.Aggregate(measurement, fAttr.attrs) + } } // Aggregation returns an Aggregation, for all the aggregated diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index ecf22f44b6d..eace2611f65 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -201,6 +201,13 @@ func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) { s.Unlock() } +// aggregateFiltered records value with spatially re-aggregated attrs. +func (s *precomputedDeltaSum[N]) aggregateFiltered(value N, attr attribute.Set) { + s.Lock() + s.recorded[attr] += value + s.Unlock() +} + func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() @@ -260,3 +267,8 @@ type precomputedSum[N int64 | float64] struct { func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) { s.set(value, attr) } + +// aggregateFiltered records value with spatially re-aggregated attrs. +func (s *precomputedSum[N]) aggregateFiltered(value N, attr attribute.Set) { + s.valueMap.Aggregate(value, attr) +} diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index d904b118ad4..6cbfb512142 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -624,6 +624,11 @@ func TestRegisterCallbackDropAggregations(t *testing.T) { } func TestAttributeFilter(t *testing.T) { + t.Run("Delta", testAttributeFilter(metricdata.DeltaTemporality)) + t.Run("Cumulative", testAttributeFilter(metricdata.CumulativeTemporality)) +} + +func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { one := 1.0 two := 2.0 testcases := []struct { @@ -650,10 +655,10 @@ func TestAttributeFilter(t *testing.T) { DataPoints: []metricdata.DataPoint[float64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 2.0, // TODO (#3439): This should be 3.0. + Value: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: true, }, }, @@ -677,10 +682,10 @@ func TestAttributeFilter(t *testing.T) { DataPoints: []metricdata.DataPoint[float64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 2.0, // TODO (#3439): This should be 3.0. + Value: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: false, }, }, @@ -729,10 +734,10 @@ func TestAttributeFilter(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 20, // TODO (#3439): This should be 30. + Value: 30, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: true, }, }, @@ -756,10 +761,10 @@ func TestAttributeFilter(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 20, // TODO (#3439): This should be 30. + Value: 30, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: false, }, }, @@ -810,7 +815,7 @@ func TestAttributeFilter(t *testing.T) { Value: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: true, }, }, @@ -836,7 +841,7 @@ func TestAttributeFilter(t *testing.T) { Value: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: false, }, }, @@ -867,7 +872,7 @@ func TestAttributeFilter(t *testing.T) { Sum: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, }, }, }, @@ -892,7 +897,7 @@ func TestAttributeFilter(t *testing.T) { Value: 30, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: true, }, }, @@ -918,7 +923,7 @@ func TestAttributeFilter(t *testing.T) { Value: 30, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, IsMonotonic: false, }, }, @@ -949,34 +954,38 @@ func TestAttributeFilter(t *testing.T) { Sum: 3.0, }, }, - Temporality: metricdata.CumulativeTemporality, + Temporality: temporality, }, }, }, } - for _, tt := range testcases { - t.Run(tt.name, func(t *testing.T) { - rdr := NewManualReader() - mtr := NewMeterProvider( - WithReader(rdr), - WithView(NewView( - Instrument{Name: "*"}, - Stream{AttributeFilter: func(kv attribute.KeyValue) bool { - return kv.Key == attribute.Key("foo") - }}, - )), - ).Meter("TestAttributeFilter") - require.NoError(t, tt.register(t, mtr)) - - m, err := rdr.Collect(context.Background()) - assert.NoError(t, err) + return func(t *testing.T) { + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + rdr := NewManualReader(WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality { + return temporality + })) + mtr := NewMeterProvider( + WithReader(rdr), + WithView(NewView( + Instrument{Name: "*"}, + Stream{AttributeFilter: func(kv attribute.KeyValue) bool { + return kv.Key == attribute.Key("foo") + }}, + )), + ).Meter("TestAttributeFilter") + require.NoError(t, tt.register(t, mtr)) + + m, err := rdr.Collect(context.Background()) + assert.NoError(t, err) - require.Len(t, m.ScopeMetrics, 1) - require.Len(t, m.ScopeMetrics[0].Metrics, 1) + require.Len(t, m.ScopeMetrics, 1) + require.Len(t, m.ScopeMetrics[0].Metrics, 1) - metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp()) - }) + metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp()) + }) + } } } From 5896179e5896b32c2496350dc2aecf1fcaab7664 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 19 Dec 2022 10:06:49 -0800 Subject: [PATCH 02/15] Ignore false positive lint error and test method --- sdk/metric/internal/sum.go | 4 ++-- sdk/metric/internal/sum_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index eace2611f65..93244da5907 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -202,7 +202,7 @@ func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) { } // aggregateFiltered records value with spatially re-aggregated attrs. -func (s *precomputedDeltaSum[N]) aggregateFiltered(value N, attr attribute.Set) { +func (s *precomputedDeltaSum[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // used to filter. s.Lock() s.recorded[attr] += value s.Unlock() @@ -269,6 +269,6 @@ func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) { } // aggregateFiltered records value with spatially re-aggregated attrs. -func (s *precomputedSum[N]) aggregateFiltered(value N, attr attribute.Set) { +func (s *precomputedSum[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // used to filter. s.valueMap.Aggregate(value, attr) } diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go index 359b73aa50a..19671aec2e1 100644 --- a/sdk/metric/internal/sum_test.go +++ b/sdk/metric/internal/sum_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -163,6 +164,29 @@ func TestDeltaSumReset(t *testing.T) { t.Run("Float64", testDeltaSumReset[float64]) } +func TestAggregateFiltered(t *testing.T) { + t.Run("PreComputedDelta", testAggregateFiltered(NewPrecomputedDeltaSum[int64](false))) + t.Run("PreComputedCumulativeSum", testAggregateFiltered(NewPrecomputedCumulativeSum[int64](false))) +} + +type af interface{ aggregateFiltered(int64, attribute.Set) } + +func testAggregateFiltered[N int64 | float64](a Aggregator[N]) func(*testing.T) { + attrs := attribute.NewSet(attribute.String("key", "val")) + return func(t *testing.T) { + a.Aggregate(1, attrs) + + require.Implements(t, (*af)(nil), a) + a.(af).aggregateFiltered(1, attrs) + + agg := a.Aggregation() + require.IsType(t, agg, metricdata.Sum[int64]{}) + sum := agg.(metricdata.Sum[int64]) + require.Len(t, sum.DataPoints, 1) + assert.Equal(t, N(2), sum.DataPoints[0].Value) + } +} + func TestEmptySumNilAggregation(t *testing.T) { assert.Nil(t, NewCumulativeSum[int64](true).Aggregation()) assert.Nil(t, NewCumulativeSum[int64](false).Aggregation()) From 222a1cbad1baec678cc63657189c6a9a9d1a6a2c Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 19 Dec 2022 10:11:38 -0800 Subject: [PATCH 03/15] Add fix to changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f310bc2655..4a2154f4880 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520) +### Fixed + +- Asynchronous instruments that use sum aggregators and attribute filters correctly add values from equivalent attribute sets that have been filtered. (#3439, #3549) + ### Changed - Global error handler uses an atomic value instead of a mutex. (#3543) From b4630709647b58ab865dccf67202f628aa2b40ed Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 20 Dec 2022 14:15:17 -0800 Subject: [PATCH 04/15] Handle edge case of exact set after filter --- sdk/metric/internal/filter.go | 38 +++++++++++++++++++---------------- sdk/metric/meter_test.go | 20 ++++++++++-------- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/sdk/metric/internal/filter.go b/sdk/metric/internal/filter.go index ff835928f1c..d11bba96baa 100644 --- a/sdk/metric/internal/filter.go +++ b/sdk/metric/internal/filter.go @@ -21,11 +21,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -type filteredSet struct { - filtered bool - attrs attribute.Set -} - // filter is an aggregator that applies attribute filter when Aggregating. filters // do not have any backing memory, and must be constructed with a backing Aggregator. type filter[N int64 | float64] struct { @@ -34,10 +29,11 @@ type filter[N int64 | float64] struct { // Used to aggreagte if an aggregator aggregates values differently for // spatically reaggregated attributes. - filtered func(N, attribute.Set) + aggregateFiltered func(N, attribute.Set) sync.Mutex - seen map[attribute.Set]filteredSet + seen map[attribute.Set]attribute.Set + filtered map[attribute.Set]bool } // NewFilter wraps an Aggregator with an attribute filtering function. @@ -48,16 +44,21 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg af, ok := agg.(interface{ aggregateFiltered(N, attribute.Set) }) if ok { return &filter[N]{ - filter: fn, - aggregator: agg, - filtered: af.aggregateFiltered, - seen: make(map[attribute.Set]filteredSet), + filter: fn, + aggregator: agg, + aggregateFiltered: af.aggregateFiltered, + seen: make(map[attribute.Set]attribute.Set), + // Use distinct filtered and seen to ensure un-filtered attributes + // that match the same previously filtered attributes is treated + // the same (added, not set). + filtered: make(map[attribute.Set]bool), } } return &filter[N]{ filter: fn, aggregator: agg, - seen: make(map[attribute.Set]filteredSet), + seen: make(map[attribute.Set]attribute.Set), + // Don't allocate filtered as it won't be used } } @@ -69,14 +70,17 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { defer f.Unlock() fAttr, ok := f.seen[attr] if !ok { - a, na := attr.Filter(f.filter) - fAttr = filteredSet{filtered: len(na) != 0, attrs: a} + var na []attribute.KeyValue + fAttr, na = attr.Filter(f.filter) f.seen[attr] = fAttr + if f.aggregateFiltered != nil && len(na) != 0 { + f.filtered[fAttr] = true + } } - if fAttr.filtered && f.filtered != nil { - f.filtered(measurement, fAttr.attrs) + if f.aggregateFiltered != nil && f.filtered[fAttr] { + f.aggregateFiltered(measurement, fAttr) } else { - f.aggregator.Aggregate(measurement, fAttr.attrs) + f.aggregator.Aggregate(measurement, fAttr) } } diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 6cbfb512142..da21279e41a 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -645,7 +645,8 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { } _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) - ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + ctr.Observe(ctx, 2.0, attribute.String("foo", "bar")) + ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) return err }, @@ -655,7 +656,7 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { DataPoints: []metricdata.DataPoint[float64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 3.0, + Value: 4.0, }, }, Temporality: temporality, @@ -672,7 +673,8 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { } _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) - ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) + ctr.Observe(ctx, 2.0, attribute.String("foo", "bar")) + ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) return err }, @@ -682,7 +684,7 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { DataPoints: []metricdata.DataPoint[float64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 3.0, + Value: 4.0, }, }, Temporality: temporality, @@ -724,7 +726,8 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { } _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) - ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + ctr.Observe(ctx, 20, attribute.String("foo", "bar")) + ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) return err }, @@ -734,7 +737,7 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 30, + Value: 40, }, }, Temporality: temporality, @@ -751,7 +754,8 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { } _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) - ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) + ctr.Observe(ctx, 20, attribute.String("foo", "bar")) + ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) return err }, @@ -761,7 +765,7 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Attributes: attribute.NewSet(attribute.String("foo", "bar")), - Value: 30, + Value: 40, }, }, Temporality: temporality, From e219ee2a29342b1ac2b34ced648a230e6a2adacc Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 21 Dec 2022 10:06:21 -0800 Subject: [PATCH 05/15] Fix filter and measure algo for precomp --- sdk/metric/internal/filter.go | 98 +++++++++++++++++++----------- sdk/metric/internal/filter_test.go | 89 +++++++++++++++++++++++++++ sdk/metric/internal/sum.go | 95 +++++++++++++++++++++++------ sdk/metric/internal/sum_test.go | 6 +- 4 files changed, 232 insertions(+), 56 deletions(-) diff --git a/sdk/metric/internal/filter.go b/sdk/metric/internal/filter.go index d11bba96baa..a08c61e3d82 100644 --- a/sdk/metric/internal/filter.go +++ b/sdk/metric/internal/filter.go @@ -21,19 +21,11 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// filter is an aggregator that applies attribute filter when Aggregating. filters -// do not have any backing memory, and must be constructed with a backing Aggregator. -type filter[N int64 | float64] struct { - filter attribute.Filter - aggregator Aggregator[N] - - // Used to aggreagte if an aggregator aggregates values differently for - // spatically reaggregated attributes. - aggregateFiltered func(N, attribute.Set) +type filterAgg[N int64 | float64] interface { + Aggregator[N] - sync.Mutex - seen map[attribute.Set]attribute.Set - filtered map[attribute.Set]bool + // filtered records values for attributes that have been filtered. + filtered(N, attribute.Set) } // NewFilter wraps an Aggregator with an attribute filtering function. @@ -41,24 +33,27 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg if fn == nil { return agg } - af, ok := agg.(interface{ aggregateFiltered(N, attribute.Set) }) - if ok { - return &filter[N]{ - filter: fn, - aggregator: agg, - aggregateFiltered: af.aggregateFiltered, - seen: make(map[attribute.Set]attribute.Set), - // Use distinct filtered and seen to ensure un-filtered attributes - // that match the same previously filtered attributes is treated - // the same (added, not set). - filtered: make(map[attribute.Set]bool), - } + if fa, ok := agg.(filterAgg[N]); ok { + return newPrecomputedFilter(fa, fn) } + return newFilter(agg, fn) +} + +// filter is an aggregator that applies attribute filter when Aggregating. filters +// do not have any backing memory, and must be constructed with a backing Aggregator. +type filter[N int64 | float64] struct { + filter attribute.Filter + aggregator Aggregator[N] + + sync.Mutex + seen map[attribute.Set]attribute.Set +} + +func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] { return &filter[N]{ filter: fn, aggregator: agg, seen: make(map[attribute.Set]attribute.Set), - // Don't allocate filtered as it won't be used } } @@ -70,18 +65,10 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { defer f.Unlock() fAttr, ok := f.seen[attr] if !ok { - var na []attribute.KeyValue - fAttr, na = attr.Filter(f.filter) + fAttr, _ = attr.Filter(f.filter) f.seen[attr] = fAttr - if f.aggregateFiltered != nil && len(na) != 0 { - f.filtered[fAttr] = true - } - } - if f.aggregateFiltered != nil && f.filtered[fAttr] { - f.aggregateFiltered(measurement, fAttr) - } else { - f.aggregator.Aggregate(measurement, fAttr) } + f.aggregator.Aggregate(measurement, fAttr) } // Aggregation returns an Aggregation, for all the aggregated @@ -89,3 +76,44 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { func (f *filter[N]) Aggregation() metricdata.Aggregation { return f.aggregator.Aggregation() } + +// precomputedFilter is an aggregator that applies attribute filter when +// Aggregating for precomputed Aggregations. The precomputed Aggregations need +// to operate normally when no attribute filtering is done (for sums this means +// setting the value), but when attribute filtering is done it needs to be +// added to any set value. +type precomputedFilter[N int64 | float64] struct { + filter attribute.Filter + aggregator filterAgg[N] + + sync.Mutex + seen map[attribute.Set]attribute.Set +} + +func newPrecomputedFilter[N int64 | float64](agg filterAgg[N], fn attribute.Filter) *precomputedFilter[N] { + return &precomputedFilter[N]{ + filter: fn, + aggregator: agg, + seen: make(map[attribute.Set]attribute.Set), + } +} +func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) { + // TODO (#3006): drop stale attributes from seen. + f.Lock() + defer f.Unlock() + fAttr, ok := f.seen[attr] + if !ok { + fAttr, _ = attr.Filter(f.filter) + f.seen[attr] = fAttr + } + if fAttr.Equals(&attr) { + // No filtering done. + f.aggregator.Aggregate(measurement, fAttr) + } else { + f.aggregator.filtered(measurement, fAttr) + } +} + +func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation { + return f.aggregator.Aggregation() +} diff --git a/sdk/metric/internal/filter_test.go b/sdk/metric/internal/filter_test.go index e5632156b30..1fd8de240d7 100644 --- a/sdk/metric/internal/filter_test.go +++ b/sdk/metric/internal/filter_test.go @@ -15,6 +15,8 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( + "fmt" + "strings" "sync" "testing" @@ -194,3 +196,90 @@ func TestFilterConcurrent(t *testing.T) { testFilterConcurrent[float64](t) }) } + +func TestPrecomputedFilter(t *testing.T) { + t.Run("Int64", testPrecomputedFilter[int64]()) + t.Run("Float64", testPrecomputedFilter[float64]()) +} + +func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) { + return func(t *testing.T) { + agg := newTestFilterAgg[N]() + f := NewFilter[N](agg, testAttributeFilter) + require.IsType(t, &precomputedFilter[N]{}, f) + + var ( + powerLevel = attribute.Int("power-level", 9000) + user = attribute.String("user", "Alice") + admin = attribute.Bool("admin", true) + ) + a := attribute.NewSet(powerLevel) + key := a + f.Aggregate(1, a) + assert.Equal(t, N(1), agg.values[key].measured, str(a)) + assert.Equal(t, N(0), agg.values[key].filtered, str(a)) + + a = attribute.NewSet(powerLevel, user) + f.Aggregate(2, a) + assert.Equal(t, N(1), agg.values[key].measured, str(a)) + assert.Equal(t, N(2), agg.values[key].filtered, str(a)) + + a = attribute.NewSet(powerLevel, user, admin) + f.Aggregate(3, a) + assert.Equal(t, N(1), agg.values[key].measured, str(a)) + assert.Equal(t, N(5), agg.values[key].filtered, str(a)) + + a = attribute.NewSet(powerLevel) + f.Aggregate(2, a) + assert.Equal(t, N(2), agg.values[key].measured, str(a)) + assert.Equal(t, N(5), agg.values[key].filtered, str(a)) + + a = attribute.NewSet(user) + f.Aggregate(3, a) + assert.Equal(t, N(2), agg.values[key].measured, str(a)) + assert.Equal(t, N(5), agg.values[key].filtered, str(a)) + assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a)) + + _ = f.Aggregation() + assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation") + } +} + +func str(a attribute.Set) string { + iter := a.Iter() + out := make([]string, 0, iter.Len()) + for iter.Next() { + kv := iter.Attribute() + out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface())) + } + return strings.Join(out, ",") +} + +type testFilterAgg[N int64 | float64] struct { + values map[attribute.Set]precomputedValue[N] + aggregationN int +} + +func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] { + return &testFilterAgg[N]{ + values: make(map[attribute.Set]precomputedValue[N]), + } +} + +func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) { + v := a.values[attr] + v.measured = val + a.values[attr] = v +} + +// nolint: unused // Used to agg filtered. +func (a *testFilterAgg[N]) filtered(val N, attr attribute.Set) { + v := a.values[attr] + v.filtered += val + a.values[attr] = v +} + +func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation { + a.aggregationN++ + return nil +} diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 93244da5907..bef452794af 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -176,7 +176,7 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { // is up to the caller to ensure this is accurate. func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { return &precomputedDeltaSum[N]{ - recorded: make(map[attribute.Set]N), + recorded: make(map[attribute.Set]precomputedValue[N]), reported: make(map[attribute.Set]N), monotonic: monotonic, start: now(), @@ -187,7 +187,7 @@ func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { // aggregation cycles as the delta arithmetic sum. type precomputedDeltaSum[N int64 | float64] struct { sync.Mutex - recorded map[attribute.Set]N + recorded map[attribute.Set]precomputedValue[N] reported map[attribute.Set]N monotonic bool @@ -197,14 +197,18 @@ type precomputedDeltaSum[N int64 | float64] struct { // Aggregate records value as a cumulative sum for attr. func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) { s.Lock() - s.recorded[attr] = value + v := s.recorded[attr] + v.measured = value + s.recorded[attr] = v s.Unlock() } -// aggregateFiltered records value with spatially re-aggregated attrs. -func (s *precomputedDeltaSum[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // used to filter. +// filtered records value with spatially re-aggregated attrs. +func (s *precomputedDeltaSum[N]) filtered(value N, attr attribute.Set) { // nolint: unused // used to filter. s.Lock() - s.recorded[attr] += value + v := s.recorded[attr] + v.filtered += value + s.recorded[attr] = v s.Unlock() } @@ -223,7 +227,8 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { DataPoints: make([]metricdata.DataPoint[N], 0, len(s.recorded)), } for attr, recorded := range s.recorded { - value := recorded - s.reported[attr] + v := recorded.measured + recorded.filtered + value := v - s.reported[attr] out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ Attributes: attr, StartTime: s.start, @@ -231,8 +236,10 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { Value: value, }) if value != 0 { - s.reported[attr] = recorded + s.reported[attr] = v } + recorded.filtered = N(0) + s.recorded[attr] = recorded // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -254,21 +261,75 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { // The output Aggregation will report recorded values as cumulative // temporality. It is up to the caller to ensure this is accurate. func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { - return &precomputedSum[N]{newCumulativeSum[N](monotonic)} + return &precomputedCumulativeSum[N]{ + values: make(map[attribute.Set]precomputedValue[N]), + monotonic: monotonic, + start: now(), + } } -// precomputedSum summarizes a set of measurements recorded over all +// precomputedCumulativeSum summarizes a set of measurements recorded over all // aggregation cycles directly as the cumulative arithmetic sum. -type precomputedSum[N int64 | float64] struct { - *cumulativeSum[N] +type precomputedCumulativeSum[N int64 | float64] struct { + sync.Mutex + values map[attribute.Set]precomputedValue[N] + + monotonic bool + start time.Time } // Aggregate records value as a cumulative sum for attr. -func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) { - s.set(value, attr) +func (s *precomputedCumulativeSum[N]) Aggregate(value N, attr attribute.Set) { + s.Lock() + v := s.values[attr] + v.measured = value + s.values[attr] = v + s.Unlock() +} + +// filtered records value with spatially re-aggregated attrs. +func (s *precomputedCumulativeSum[N]) filtered(value N, attr attribute.Set) { // nolint: unused // used to filter. + s.Lock() + v := s.values[attr] + v.filtered += value + s.values[attr] = v + s.Unlock() +} + +func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { + s.Lock() + defer s.Unlock() + + if len(s.values) == 0 { + return nil + } + + t := now() + out := metricdata.Sum[N]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: s.monotonic, + DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), + } + for attr, value := range s.values { + out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ + Attributes: attr, + StartTime: s.start, + Time: t, + Value: value.measured + value.filtered, + }) + value.filtered = N(0) + s.values[attr] = value + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + } + return out } -// aggregateFiltered records value with spatially re-aggregated attrs. -func (s *precomputedSum[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // used to filter. - s.valueMap.Aggregate(value, attr) +type precomputedValue[N int64 | float64] struct { + // measured is the value directly measured. + measured N + // filtered is the sum of values from spatially aggregations. + filtered N } diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go index 19671aec2e1..72026b4fbb6 100644 --- a/sdk/metric/internal/sum_test.go +++ b/sdk/metric/internal/sum_test.go @@ -169,15 +169,13 @@ func TestAggregateFiltered(t *testing.T) { t.Run("PreComputedCumulativeSum", testAggregateFiltered(NewPrecomputedCumulativeSum[int64](false))) } -type af interface{ aggregateFiltered(int64, attribute.Set) } - func testAggregateFiltered[N int64 | float64](a Aggregator[N]) func(*testing.T) { attrs := attribute.NewSet(attribute.String("key", "val")) return func(t *testing.T) { a.Aggregate(1, attrs) - require.Implements(t, (*af)(nil), a) - a.(af).aggregateFiltered(1, attrs) + require.Implements(t, (*filterAgg[N])(nil), a) + a.(filterAgg[N]).filtered(1, attrs) agg := a.Aggregation() require.IsType(t, agg, metricdata.Sum[int64]{}) From cd34714324fcb318f9515ec1b421f8d5be923319 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 21 Dec 2022 11:16:22 -0800 Subject: [PATCH 06/15] Add tests for precomp sums --- sdk/metric/internal/sum_test.go | 135 ++++++++++++++++++++++++++++---- 1 file changed, 121 insertions(+), 14 deletions(-) diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go index 72026b4fbb6..b3e18d764da 100644 --- a/sdk/metric/internal/sum_test.go +++ b/sdk/metric/internal/sum_test.go @@ -164,25 +164,132 @@ func TestDeltaSumReset(t *testing.T) { t.Run("Float64", testDeltaSumReset[float64]) } -func TestAggregateFiltered(t *testing.T) { - t.Run("PreComputedDelta", testAggregateFiltered(NewPrecomputedDeltaSum[int64](false))) - t.Run("PreComputedCumulativeSum", testAggregateFiltered(NewPrecomputedCumulativeSum[int64](false))) -} +func TestPreComputedDeltaSum(t *testing.T) { + var mono bool + agg := NewPrecomputedDeltaSum[int64](mono) + require.Implements(t, (*filterAgg[int64])(nil), agg) -func testAggregateFiltered[N int64 | float64](a Aggregator[N]) func(*testing.T) { attrs := attribute.NewSet(attribute.String("key", "val")) - return func(t *testing.T) { - a.Aggregate(1, attrs) + agg.Aggregate(1, attrs) + got := agg.Aggregation() + want := metricdata.Sum[int64]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, + } + opt := metricdatatest.IgnoreTimestamp() + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Delta values should zero. + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + agg.(filterAgg[int64]).filtered(1, attrs) + got = agg.Aggregation() + // measured(+): 1, previous(-): 1, filtered(+): 1 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Filtered values should not persist. + got = agg.Aggregation() + // measured(+): 1, previous(-): 2, filtered(+): 0 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -1)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + got = agg.Aggregation() + // measured(+): 1, previous(-): 1, filtered(+): 0 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Override set value. + agg.Aggregate(2, attrs) + agg.Aggregate(5, attrs) + // Filtered should add. + agg.(filterAgg[int64]).filtered(3, attrs) + agg.(filterAgg[int64]).filtered(10, attrs) + got = agg.Aggregation() + // measured(+): 5, previous(-): 1, filtered(+): 13 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Filtered values should not persist. + agg.Aggregate(5, attrs) + got = agg.Aggregation() + // measured(+): 5, previous(-): 18, filtered(+): 0 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Order should not affect measure. + // Filtered should add. + agg.(filterAgg[int64]).filtered(3, attrs) + agg.Aggregate(7, attrs) + agg.(filterAgg[int64]).filtered(10, attrs) + got = agg.Aggregation() + // measured(+): 7, previous(-): 5, filtered(+): 13 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + agg.Aggregate(7, attrs) + got = agg.Aggregation() + // measured(+): 7, previous(-): 20, filtered(+): 0 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) +} - require.Implements(t, (*filterAgg[N])(nil), a) - a.(filterAgg[N]).filtered(1, attrs) +func TestPreComputedCumulativeSum(t *testing.T) { + var mono bool + agg := NewPrecomputedCumulativeSum[int64](mono) + require.Implements(t, (*filterAgg[int64])(nil), agg) - agg := a.Aggregation() - require.IsType(t, agg, metricdata.Sum[int64]{}) - sum := agg.(metricdata.Sum[int64]) - require.Len(t, sum.DataPoints, 1) - assert.Equal(t, N(2), sum.DataPoints[0].Value) + attrs := attribute.NewSet(attribute.String("key", "val")) + agg.Aggregate(1, attrs) + got := agg.Aggregation() + want := metricdata.Sum[int64]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, } + opt := metricdatatest.IgnoreTimestamp() + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Cumulative values should persist. + got = agg.Aggregation() + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + agg.(filterAgg[int64]).filtered(1, attrs) + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Filtered values should not persist. + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Override set value. + agg.Aggregate(5, attrs) + // Filtered should add. + agg.(filterAgg[int64]).filtered(3, attrs) + agg.(filterAgg[int64]).filtered(10, attrs) + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Filtered values should not persist. + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 5)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + + // Order should not affect measure. + // Filtered should add. + agg.(filterAgg[int64]).filtered(3, attrs) + agg.Aggregate(7, attrs) + agg.(filterAgg[int64]).filtered(10, attrs) + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) + got = agg.Aggregation() + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 7)} + metricdatatest.AssertAggregationsEqual(t, want, got, opt) } func TestEmptySumNilAggregation(t *testing.T) { From 954145f4909d45929e9196c6a5c2fd5665905c8d Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 21 Dec 2022 12:38:28 -0800 Subject: [PATCH 07/15] Unify precomputedMap --- sdk/metric/internal/sum.go | 127 +++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 70 deletions(-) diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index bef452794af..2fff3ba00d8 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -22,7 +22,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// valueMap is the storage for all sums. +// valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex values map[attribute.Set]N @@ -32,12 +32,6 @@ func newValueMap[N int64 | float64]() *valueMap[N] { return &valueMap[N]{values: make(map[attribute.Set]N)} } -func (s *valueMap[N]) set(value N, attr attribute.Set) { // nolint: unused // This is indeed used. - s.Lock() - s.values[attr] = value - s.Unlock() -} - func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) { s.Lock() s.values[attr] += value @@ -164,6 +158,43 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { return out } +type precomputedValue[N int64 | float64] struct { + // measured is the value directly measured. + measured N + // filtered is the sum of values from spatially aggregations. + filtered N +} + +// valueMap is the storage for precomputed sums. +type precomputedMap[N int64 | float64] struct { + sync.Mutex + values map[attribute.Set]precomputedValue[N] +} + +func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] { + return &precomputedMap[N]{ + values: make(map[attribute.Set]precomputedValue[N]), + } +} + +// Aggregate records value as a cumulative sum for attr. +func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) { + s.Lock() + v := s.values[attr] + v.measured = value + s.values[attr] = v + s.Unlock() +} + +// filtered records value with spatially re-aggregated attrs. +func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered. + s.Lock() + v := s.values[attr] + v.filtered += value + s.values[attr] = v + s.Unlock() +} + // NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of // measurements as their pre-computed arithmetic sum. Each sum is scoped by // attributes and the aggregation cycle the measurements were made in. @@ -176,47 +207,29 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { // is up to the caller to ensure this is accurate. func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { return &precomputedDeltaSum[N]{ - recorded: make(map[attribute.Set]precomputedValue[N]), - reported: make(map[attribute.Set]N), - monotonic: monotonic, - start: now(), + precomputedMap: newPrecomputedMap[N](), + reported: make(map[attribute.Set]N), + monotonic: monotonic, + start: now(), } } // precomputedDeltaSum summarizes a set of measurements recorded over all // aggregation cycles as the delta arithmetic sum. type precomputedDeltaSum[N int64 | float64] struct { - sync.Mutex - recorded map[attribute.Set]precomputedValue[N] + *precomputedMap[N] + reported map[attribute.Set]N monotonic bool start time.Time } -// Aggregate records value as a cumulative sum for attr. -func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) { - s.Lock() - v := s.recorded[attr] - v.measured = value - s.recorded[attr] = v - s.Unlock() -} - -// filtered records value with spatially re-aggregated attrs. -func (s *precomputedDeltaSum[N]) filtered(value N, attr attribute.Set) { // nolint: unused // used to filter. - s.Lock() - v := s.recorded[attr] - v.filtered += value - s.recorded[attr] = v - s.Unlock() -} - func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() - if len(s.recorded) == 0 { + if len(s.values) == 0 { return nil } @@ -224,22 +237,22 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { out := metricdata.Sum[N]{ Temporality: metricdata.DeltaTemporality, IsMonotonic: s.monotonic, - DataPoints: make([]metricdata.DataPoint[N], 0, len(s.recorded)), + DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), } - for attr, recorded := range s.recorded { - v := recorded.measured + recorded.filtered - value := v - s.reported[attr] + for attr, value := range s.values { + v := value.measured + value.filtered + delta := v - s.reported[attr] out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ Attributes: attr, StartTime: s.start, Time: t, - Value: value, + Value: delta, }) - if value != 0 { + if delta != 0 { s.reported[attr] = v } - recorded.filtered = N(0) - s.recorded[attr] = recorded + value.filtered = N(0) + s.values[attr] = value // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -262,40 +275,21 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { // temporality. It is up to the caller to ensure this is accurate. func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { return &precomputedCumulativeSum[N]{ - values: make(map[attribute.Set]precomputedValue[N]), - monotonic: monotonic, - start: now(), + precomputedMap: newPrecomputedMap[N](), + monotonic: monotonic, + start: now(), } } // precomputedCumulativeSum summarizes a set of measurements recorded over all // aggregation cycles directly as the cumulative arithmetic sum. type precomputedCumulativeSum[N int64 | float64] struct { - sync.Mutex - values map[attribute.Set]precomputedValue[N] + *precomputedMap[N] monotonic bool start time.Time } -// Aggregate records value as a cumulative sum for attr. -func (s *precomputedCumulativeSum[N]) Aggregate(value N, attr attribute.Set) { - s.Lock() - v := s.values[attr] - v.measured = value - s.values[attr] = v - s.Unlock() -} - -// filtered records value with spatially re-aggregated attrs. -func (s *precomputedCumulativeSum[N]) filtered(value N, attr attribute.Set) { // nolint: unused // used to filter. - s.Lock() - v := s.values[attr] - v.filtered += value - s.values[attr] = v - s.Unlock() -} - func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() @@ -326,10 +320,3 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { } return out } - -type precomputedValue[N int64 | float64] struct { - // measured is the value directly measured. - measured N - // filtered is the sum of values from spatially aggregations. - filtered N -} From 876827b263ca44e971538a4e7b63df8b67d254c7 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Thu, 5 Jan 2023 12:43:52 -0600 Subject: [PATCH 08/15] Adds example from supplimental guide --- sdk/metric/meter_example_test.go | 375 +++++++++++++++++++++++++++++++ 1 file changed, 375 insertions(+) create mode 100644 sdk/metric/meter_example_test.go diff --git a/sdk/metric/meter_example_test.go b/sdk/metric/meter_example_test.go new file mode 100644 index 00000000000..5bada1a1924 --- /dev/null +++ b/sdk/metric/meter_example_test.go @@ -0,0 +1,375 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric_test + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +// This example can be found: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/supplementary-guidelines.md#asynchronous-example +func TestCumulativeAsynchronousExample(t *testing.T) { + ctx := context.Background() + filter := attribute.Filter(func(kv attribute.KeyValue) bool { + if kv.Key == "tid" { + return false + } + return true + }) + reader := metric.NewManualReader() + + defaultView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "pageFaults"}) + filteredView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "filteredPageFaults", AttributeFilter: filter}) + + meter := metric.NewMeterProvider( + metric.WithReader(reader), + metric.WithView(defaultView), + metric.WithView(filteredView), + ).Meter("AsynchronousExample") + + ctr, err := meter.Int64ObservableCounter("pageFaults") + assert.NoError(t, err) + + tid1Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 1)} + tid2Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 2)} + tid3Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 3)} + + attrs := [][]attribute.KeyValue{tid1Attrs, tid2Attrs, tid3Attrs} + + pfValues := []int64{0, 0, 0} + + meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + for i := range pfValues { + if pfValues[i] != 0 { + ctr.Observe(ctx, pfValues[i], attrs[i]...) + } + } + }) + + filteredAttributeSet := attribute.NewSet(attribute.KeyValue{Key: "pid", Value: attribute.StringValue("1001")}) + + // During the time range (T0, T1]: + // pid = 1001, tid = 1, #PF = 50 + // pid = 1001, tid = 2, #PF = 30 + atomic.StoreInt64(&pfValues[0], 50) + atomic.StoreInt64(&pfValues[1], 30) + + wantScopeMetrics := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "AsynchronousExample"}, + Metrics: []metricdata.Metrics{ + { + Name: "filteredPageFaults", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: filteredAttributeSet, + Value: 80, + }, + }, + }, + }, + { + Name: "pageFaults", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(tid1Attrs...), + Value: 50, + }, + { + Attributes: attribute.NewSet(tid2Attrs...), + Value: 30, + }, + }, + }, + }, + }, + } + + metrics, err := reader.Collect(ctx) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + + wantFilterValue := &wantScopeMetrics.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantDataPoint1Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantDataPoint2Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value + + // During the time range (T1, T2]: + // pid = 1001, tid = 1, #PF = 53 + // pid = 1001, tid = 2, #PF = 38 + + atomic.StoreInt64(&pfValues[0], 53) + atomic.StoreInt64(&pfValues[1], 38) + + *wantFilterValue = 91 + *wantDataPoint1Value = 53 + *wantDataPoint2Value = 38 + + metrics, err = reader.Collect(ctx) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + + // During the time range (T2, T3] + // pid = 1001, tid = 1, #PF = 56 + // pid = 1001, tid = 2, #PF = 42 + + atomic.StoreInt64(&pfValues[0], 56) + atomic.StoreInt64(&pfValues[1], 42) + + *wantFilterValue = 98 + *wantDataPoint1Value = 56 + *wantDataPoint2Value = 42 + + metrics, err = reader.Collect(ctx) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + + // During the time range (T3, T4]: + // pid = 1001, tid = 1, #PF = 60 + // pid = 1001, tid = 2, #PF = 47 + + atomic.StoreInt64(&pfValues[0], 60) + atomic.StoreInt64(&pfValues[1], 47) + + *wantFilterValue = 107 + *wantDataPoint1Value = 60 + *wantDataPoint2Value = 47 + + metrics, err = reader.Collect(ctx) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + + // During the time range (T4, T5]: + // thread 1 died, thread 3 started + // pid = 1001, tid = 2, #PF = 53 + // pid = 1001, tid = 3, #PF = 5 + + atomic.StoreInt64(&pfValues[0], 0) + atomic.StoreInt64(&pfValues[1], 53) + atomic.StoreInt64(&pfValues[2], 5) + + *wantFilterValue = 58 + wantAgg := metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(tid1Attrs...), + Value: 60, + }, + { + Attributes: attribute.NewSet(tid2Attrs...), + Value: 53, + }, + { + Attributes: attribute.NewSet(tid3Attrs...), + Value: 5, + }, + }, + } + wantScopeMetrics.Metrics[1].Data = wantAgg + + metrics, err = reader.Collect(ctx) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + +} + +// This example can be found: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/supplementary-guidelines.md#asynchronous-example + +func TestDeltaAsynchronousExample(t *testing.T) { + ctx := context.Background() + filter := attribute.Filter(func(kv attribute.KeyValue) bool { + if kv.Key == "tid" { + return false + } + return true + }) + reader := metric.NewManualReader(metric.WithTemporalitySelector(func(ik metric.InstrumentKind) metricdata.Temporality { return metricdata.DeltaTemporality })) + + defaultView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "pageFaults"}) + filteredView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "filteredPageFaults", AttributeFilter: filter}) + + meter := metric.NewMeterProvider( + metric.WithReader(reader), + metric.WithView(defaultView), + metric.WithView(filteredView), + ).Meter("AsynchronousExample") + + ctr, err := meter.Int64ObservableCounter("pageFaults") + assert.NoError(t, err) + + tid1Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 1)} + tid2Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 2)} + tid3Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 3)} + + attrs := [][]attribute.KeyValue{tid1Attrs, tid2Attrs, tid3Attrs} + + pfValues := []int64{0, 0, 0} + + meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + for i := range pfValues { + if pfValues[i] != 0 { + ctr.Observe(ctx, pfValues[i], attrs[i]...) + } + } + }) + + filteredAttributeSet := attribute.NewSet(attribute.KeyValue{Key: "pid", Value: attribute.StringValue("1001")}) + + // During the time range (T0, T1]: + // pid = 1001, tid = 1, #PF = 50 + // pid = 1001, tid = 2, #PF = 30 + atomic.StoreInt64(&pfValues[0], 50) + atomic.StoreInt64(&pfValues[1], 30) + + wantScopeMetrics := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "AsynchronousExample"}, + Metrics: []metricdata.Metrics{ + { + Name: "filteredPageFaults", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.DeltaTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: filteredAttributeSet, + Value: 80, + }, + }, + }, + }, + { + Name: "pageFaults", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.DeltaTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(tid1Attrs...), + Value: 50, + }, + { + Attributes: attribute.NewSet(tid2Attrs...), + Value: 30, + }, + }, + }, + }, + }, + } + + metrics, err := reader.Collect(ctx) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + + wantFilterValue := &wantScopeMetrics.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantDataPoint1Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantDataPoint2Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value + + // During the time range (T1, T2]: + // pid = 1001, tid = 1, #PF = 53 + // pid = 1001, tid = 2, #PF = 38 + + atomic.StoreInt64(&pfValues[0], 53) + atomic.StoreInt64(&pfValues[1], 38) + + *wantFilterValue = 11 + *wantDataPoint1Value = 3 + *wantDataPoint2Value = 8 + + metrics, err = reader.Collect(ctx) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + + // During the time range (T2, T3] + // pid = 1001, tid = 1, #PF = 56 + // pid = 1001, tid = 2, #PF = 42 + + atomic.StoreInt64(&pfValues[0], 56) + atomic.StoreInt64(&pfValues[1], 42) + + *wantFilterValue = 7 + *wantDataPoint1Value = 3 + *wantDataPoint2Value = 4 + + metrics, err = reader.Collect(ctx) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + + // During the time range (T3, T4]: + // pid = 1001, tid = 1, #PF = 60 + // pid = 1001, tid = 2, #PF = 47 + + atomic.StoreInt64(&pfValues[0], 60) + atomic.StoreInt64(&pfValues[1], 47) + + *wantFilterValue = 9 + *wantDataPoint1Value = 4 + *wantDataPoint2Value = 5 + + metrics, err = reader.Collect(ctx) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + + // During the time range (T4, T5]: + // thread 1 died, thread 3 started + // pid = 1001, tid = 2, #PF = 53 + // pid = 1001, tid = 3, #PF = 5 + + atomic.StoreInt64(&pfValues[0], 0) + atomic.StoreInt64(&pfValues[1], 53) + atomic.StoreInt64(&pfValues[2], 5) + + *wantFilterValue = -49 // TODO: This should be 11 + + wantAgg := metricdata.Sum[int64]{ + Temporality: metricdata.DeltaTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(tid1Attrs...), + Value: 0, + }, + { + Attributes: attribute.NewSet(tid2Attrs...), + Value: 6, + }, + { + Attributes: attribute.NewSet(tid3Attrs...), + Value: 5, + }, + }, + } + wantScopeMetrics.Metrics[1].Data = wantAgg + + metrics, err = reader.Collect(ctx) + assert.NoError(t, err) + + metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) +} From 79e0f4adce2c68f6c3efeadc50a65538ae325e55 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Thu, 5 Jan 2023 12:51:07 -0600 Subject: [PATCH 09/15] Fixes for lint --- sdk/metric/meter_example_test.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/sdk/metric/meter_example_test.go b/sdk/metric/meter_example_test.go index 5bada1a1924..4dcfcbabb78 100644 --- a/sdk/metric/meter_example_test.go +++ b/sdk/metric/meter_example_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/sdk/instrumentation" @@ -32,10 +33,7 @@ import ( func TestCumulativeAsynchronousExample(t *testing.T) { ctx := context.Background() filter := attribute.Filter(func(kv attribute.KeyValue) bool { - if kv.Key == "tid" { - return false - } - return true + return kv.Key != "tid" }) reader := metric.NewManualReader() @@ -59,13 +57,14 @@ func TestCumulativeAsynchronousExample(t *testing.T) { pfValues := []int64{0, 0, 0} - meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { for i := range pfValues { if pfValues[i] != 0 { ctr.Observe(ctx, pfValues[i], attrs[i]...) } } }) + assert.NoError(t, err) filteredAttributeSet := attribute.NewSet(attribute.KeyValue{Key: "pid", Value: attribute.StringValue("1001")}) @@ -197,7 +196,6 @@ func TestCumulativeAsynchronousExample(t *testing.T) { metrics, err = reader.Collect(ctx) assert.NoError(t, err) metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - } // This example can be found: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/supplementary-guidelines.md#asynchronous-example @@ -205,10 +203,7 @@ func TestCumulativeAsynchronousExample(t *testing.T) { func TestDeltaAsynchronousExample(t *testing.T) { ctx := context.Background() filter := attribute.Filter(func(kv attribute.KeyValue) bool { - if kv.Key == "tid" { - return false - } - return true + return kv.Key != "tid" }) reader := metric.NewManualReader(metric.WithTemporalitySelector(func(ik metric.InstrumentKind) metricdata.Temporality { return metricdata.DeltaTemporality })) @@ -232,13 +227,14 @@ func TestDeltaAsynchronousExample(t *testing.T) { pfValues := []int64{0, 0, 0} - meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { for i := range pfValues { if pfValues[i] != 0 { ctr.Observe(ctx, pfValues[i], attrs[i]...) } } }) + assert.NoError(t, err) filteredAttributeSet := attribute.NewSet(attribute.KeyValue{Key: "pid", Value: attribute.StringValue("1001")}) From 406a4bfd91e4a4a0f28ca24f2da2cfbdbb0f192a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 13 Jan 2023 09:33:06 -0800 Subject: [PATCH 10/15] Update sdk/metric/meter_example_test.go --- sdk/metric/meter_example_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/meter_example_test.go b/sdk/metric/meter_example_test.go index 4dcfcbabb78..ba3e5e7acc3 100644 --- a/sdk/metric/meter_example_test.go +++ b/sdk/metric/meter_example_test.go @@ -342,7 +342,7 @@ func TestDeltaAsynchronousExample(t *testing.T) { atomic.StoreInt64(&pfValues[1], 53) atomic.StoreInt64(&pfValues[2], 5) - *wantFilterValue = -49 // TODO: This should be 11 + *wantFilterValue = -49 wantAgg := metricdata.Sum[int64]{ Temporality: metricdata.DeltaTemporality, From 657c0694a022fed7763ec167788411106b17eeea Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 13 Jan 2023 10:58:53 -0800 Subject: [PATCH 11/15] Fix async example test --- sdk/metric/meter_example_test.go | 371 ------------------------------- sdk/metric/meter_test.go | 253 +++++++++++++++++++++ 2 files changed, 253 insertions(+), 371 deletions(-) delete mode 100644 sdk/metric/meter_example_test.go diff --git a/sdk/metric/meter_example_test.go b/sdk/metric/meter_example_test.go deleted file mode 100644 index ba3e5e7acc3..00000000000 --- a/sdk/metric/meter_example_test.go +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric_test - -import ( - "context" - "sync/atomic" - "testing" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" -) - -// This example can be found: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/supplementary-guidelines.md#asynchronous-example -func TestCumulativeAsynchronousExample(t *testing.T) { - ctx := context.Background() - filter := attribute.Filter(func(kv attribute.KeyValue) bool { - return kv.Key != "tid" - }) - reader := metric.NewManualReader() - - defaultView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "pageFaults"}) - filteredView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "filteredPageFaults", AttributeFilter: filter}) - - meter := metric.NewMeterProvider( - metric.WithReader(reader), - metric.WithView(defaultView), - metric.WithView(filteredView), - ).Meter("AsynchronousExample") - - ctr, err := meter.Int64ObservableCounter("pageFaults") - assert.NoError(t, err) - - tid1Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 1)} - tid2Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 2)} - tid3Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 3)} - - attrs := [][]attribute.KeyValue{tid1Attrs, tid2Attrs, tid3Attrs} - - pfValues := []int64{0, 0, 0} - - _, err = meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { - for i := range pfValues { - if pfValues[i] != 0 { - ctr.Observe(ctx, pfValues[i], attrs[i]...) - } - } - }) - assert.NoError(t, err) - - filteredAttributeSet := attribute.NewSet(attribute.KeyValue{Key: "pid", Value: attribute.StringValue("1001")}) - - // During the time range (T0, T1]: - // pid = 1001, tid = 1, #PF = 50 - // pid = 1001, tid = 2, #PF = 30 - atomic.StoreInt64(&pfValues[0], 50) - atomic.StoreInt64(&pfValues[1], 30) - - wantScopeMetrics := metricdata.ScopeMetrics{ - Scope: instrumentation.Scope{Name: "AsynchronousExample"}, - Metrics: []metricdata.Metrics{ - { - Name: "filteredPageFaults", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: filteredAttributeSet, - Value: 80, - }, - }, - }, - }, - { - Name: "pageFaults", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(tid1Attrs...), - Value: 50, - }, - { - Attributes: attribute.NewSet(tid2Attrs...), - Value: 30, - }, - }, - }, - }, - }, - } - - metrics, err := reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - wantFilterValue := &wantScopeMetrics.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantDataPoint1Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantDataPoint2Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value - - // During the time range (T1, T2]: - // pid = 1001, tid = 1, #PF = 53 - // pid = 1001, tid = 2, #PF = 38 - - atomic.StoreInt64(&pfValues[0], 53) - atomic.StoreInt64(&pfValues[1], 38) - - *wantFilterValue = 91 - *wantDataPoint1Value = 53 - *wantDataPoint2Value = 38 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T2, T3] - // pid = 1001, tid = 1, #PF = 56 - // pid = 1001, tid = 2, #PF = 42 - - atomic.StoreInt64(&pfValues[0], 56) - atomic.StoreInt64(&pfValues[1], 42) - - *wantFilterValue = 98 - *wantDataPoint1Value = 56 - *wantDataPoint2Value = 42 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T3, T4]: - // pid = 1001, tid = 1, #PF = 60 - // pid = 1001, tid = 2, #PF = 47 - - atomic.StoreInt64(&pfValues[0], 60) - atomic.StoreInt64(&pfValues[1], 47) - - *wantFilterValue = 107 - *wantDataPoint1Value = 60 - *wantDataPoint2Value = 47 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T4, T5]: - // thread 1 died, thread 3 started - // pid = 1001, tid = 2, #PF = 53 - // pid = 1001, tid = 3, #PF = 5 - - atomic.StoreInt64(&pfValues[0], 0) - atomic.StoreInt64(&pfValues[1], 53) - atomic.StoreInt64(&pfValues[2], 5) - - *wantFilterValue = 58 - wantAgg := metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(tid1Attrs...), - Value: 60, - }, - { - Attributes: attribute.NewSet(tid2Attrs...), - Value: 53, - }, - { - Attributes: attribute.NewSet(tid3Attrs...), - Value: 5, - }, - }, - } - wantScopeMetrics.Metrics[1].Data = wantAgg - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) -} - -// This example can be found: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/supplementary-guidelines.md#asynchronous-example - -func TestDeltaAsynchronousExample(t *testing.T) { - ctx := context.Background() - filter := attribute.Filter(func(kv attribute.KeyValue) bool { - return kv.Key != "tid" - }) - reader := metric.NewManualReader(metric.WithTemporalitySelector(func(ik metric.InstrumentKind) metricdata.Temporality { return metricdata.DeltaTemporality })) - - defaultView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "pageFaults"}) - filteredView := metric.NewView(metric.Instrument{Name: "pageFaults"}, metric.Stream{Name: "filteredPageFaults", AttributeFilter: filter}) - - meter := metric.NewMeterProvider( - metric.WithReader(reader), - metric.WithView(defaultView), - metric.WithView(filteredView), - ).Meter("AsynchronousExample") - - ctr, err := meter.Int64ObservableCounter("pageFaults") - assert.NoError(t, err) - - tid1Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 1)} - tid2Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 2)} - tid3Attrs := []attribute.KeyValue{attribute.String("pid", "1001"), attribute.Int("tid", 3)} - - attrs := [][]attribute.KeyValue{tid1Attrs, tid2Attrs, tid3Attrs} - - pfValues := []int64{0, 0, 0} - - _, err = meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { - for i := range pfValues { - if pfValues[i] != 0 { - ctr.Observe(ctx, pfValues[i], attrs[i]...) - } - } - }) - assert.NoError(t, err) - - filteredAttributeSet := attribute.NewSet(attribute.KeyValue{Key: "pid", Value: attribute.StringValue("1001")}) - - // During the time range (T0, T1]: - // pid = 1001, tid = 1, #PF = 50 - // pid = 1001, tid = 2, #PF = 30 - atomic.StoreInt64(&pfValues[0], 50) - atomic.StoreInt64(&pfValues[1], 30) - - wantScopeMetrics := metricdata.ScopeMetrics{ - Scope: instrumentation.Scope{Name: "AsynchronousExample"}, - Metrics: []metricdata.Metrics{ - { - Name: "filteredPageFaults", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.DeltaTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: filteredAttributeSet, - Value: 80, - }, - }, - }, - }, - { - Name: "pageFaults", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.DeltaTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(tid1Attrs...), - Value: 50, - }, - { - Attributes: attribute.NewSet(tid2Attrs...), - Value: 30, - }, - }, - }, - }, - }, - } - - metrics, err := reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - wantFilterValue := &wantScopeMetrics.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantDataPoint1Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantDataPoint2Value := &wantScopeMetrics.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value - - // During the time range (T1, T2]: - // pid = 1001, tid = 1, #PF = 53 - // pid = 1001, tid = 2, #PF = 38 - - atomic.StoreInt64(&pfValues[0], 53) - atomic.StoreInt64(&pfValues[1], 38) - - *wantFilterValue = 11 - *wantDataPoint1Value = 3 - *wantDataPoint2Value = 8 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T2, T3] - // pid = 1001, tid = 1, #PF = 56 - // pid = 1001, tid = 2, #PF = 42 - - atomic.StoreInt64(&pfValues[0], 56) - atomic.StoreInt64(&pfValues[1], 42) - - *wantFilterValue = 7 - *wantDataPoint1Value = 3 - *wantDataPoint2Value = 4 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T3, T4]: - // pid = 1001, tid = 1, #PF = 60 - // pid = 1001, tid = 2, #PF = 47 - - atomic.StoreInt64(&pfValues[0], 60) - atomic.StoreInt64(&pfValues[1], 47) - - *wantFilterValue = 9 - *wantDataPoint1Value = 4 - *wantDataPoint2Value = 5 - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) - - // During the time range (T4, T5]: - // thread 1 died, thread 3 started - // pid = 1001, tid = 2, #PF = 53 - // pid = 1001, tid = 3, #PF = 5 - - atomic.StoreInt64(&pfValues[0], 0) - atomic.StoreInt64(&pfValues[1], 53) - atomic.StoreInt64(&pfValues[2], 5) - - *wantFilterValue = -49 - - wantAgg := metricdata.Sum[int64]{ - Temporality: metricdata.DeltaTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet(tid1Attrs...), - Value: 0, - }, - { - Attributes: attribute.NewSet(tid2Attrs...), - Value: 6, - }, - { - Attributes: attribute.NewSet(tid3Attrs...), - Value: 5, - }, - }, - } - wantScopeMetrics.Metrics[1].Data = wantAgg - - metrics, err = reader.Collect(ctx) - assert.NoError(t, err) - - metricdatatest.AssertEqual(t, wantScopeMetrics, metrics.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) -} diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index c38c44d8b9b..6e85ff85d78 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -1042,6 +1042,259 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { } } +func TestAsynchronousExample(t *testing.T) { + // This example can be found: + // https://github.com/open-telemetry/opentelemetry-specification/blob/8b91585e6175dd52b51e1d60bea105041225e35d/specification/metrics/supplementary-guidelines.md#asynchronous-example + var ( + threadID1 = attribute.Int("tid", 1) + threadID2 = attribute.Int("tid", 2) + threadID3 = attribute.Int("tid", 3) + + processID1001 = attribute.String("pid", "1001") + + thread1 = attribute.NewSet(processID1001, threadID1) + thread2 = attribute.NewSet(processID1001, threadID2) + thread3 = attribute.NewSet(processID1001, threadID3) + + process1001 = attribute.NewSet(processID1001) + ) + + setup := func(t *testing.T, temp metricdata.Temporality) (map[attribute.Set]int64, func(*testing.T), *metricdata.ScopeMetrics) { + t.Helper() + + const ( + instName = "pageFaults" + filteredStream = "filteredPageFaults" + scopeName = "AsynchronousExample" + ) + + selector := func(InstrumentKind) metricdata.Temporality { return temp } + reader := NewManualReader(WithTemporalitySelector(selector)) + + noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName}) + + filter := func(kv attribute.KeyValue) bool { return kv.Key != "tid" } + filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter}) + + mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered)) + meter := mp.Meter(scopeName) + + observations := make(map[attribute.Set]int64) + _, err := meter.Int64ObservableCounter(instName, instrument.WithInt64Callback( + func(ctx context.Context, o instrument.Int64Observer) error { + for attrSet, val := range observations { + o.Observe(ctx, val, attrSet.ToSlice()...) + } + return nil + }, + )) + require.NoError(t, err) + + want := &metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: scopeName}, + Metrics: []metricdata.Metrics{{Name: filteredStream}, {Name: instName}}, + } + + collect := func(t *testing.T) { + t.Helper() + got, err := reader.Collect(context.Background()) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) + } + + return observations, collect, want + } + + t.Run("Cumulative", func(t *testing.T) { + temporality := metricdata.CumulativeTemporality + observations, verify, want := setup(t, temporality) + + want.Metrics[0].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: process1001}, + }, + } + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: thread1}, + {Attributes: thread2}, + }, + } + wantFiltered := &want.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread1 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread2 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value + + // During the time range (T0, T1]: + // pid = 1001, tid = 1, #PF = 50 + // pid = 1001, tid = 2, #PF = 30 + observations[thread1] = 50 + observations[thread2] = 30 + + *wantFiltered = 80 + *wantThread1 = 50 + *wantThread2 = 30 + + verify(t) + + // During the time range (T1, T2]: + // pid = 1001, tid = 1, #PF = 53 + // pid = 1001, tid = 2, #PF = 38 + observations[thread1] = 53 + observations[thread2] = 38 + + *wantFiltered = 91 + *wantThread1 = 53 + *wantThread2 = 38 + + verify(t) + + // During the time range (T2, T3] + // pid = 1001, tid = 1, #PF = 56 + // pid = 1001, tid = 2, #PF = 42 + observations[thread1] = 56 + observations[thread2] = 42 + + *wantFiltered = 98 + *wantThread1 = 56 + *wantThread2 = 42 + + verify(t) + + // During the time range (T3, T4]: + // pid = 1001, tid = 1, #PF = 60 + // pid = 1001, tid = 2, #PF = 47 + observations[thread1] = 60 + observations[thread2] = 47 + + *wantFiltered = 107 + *wantThread1 = 60 + *wantThread2 = 47 + + verify(t) + + // During the time range (T4, T5]: + // thread 1 died, thread 3 started + // pid = 1001, tid = 2, #PF = 53 + // pid = 1001, tid = 3, #PF = 5 + delete(observations, thread1) + observations[thread2] = 53 + observations[thread3] = 5 + + *wantFiltered = 58 + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + // Thread 1 remains at last measured value. + {Attributes: thread1, Value: 60}, + {Attributes: thread2, Value: 53}, + {Attributes: thread3, Value: 5}, + }, + } + + verify(t) + }) + + t.Run("Delta", func(t *testing.T) { + temporality := metricdata.DeltaTemporality + observations, verify, want := setup(t, temporality) + + want.Metrics[0].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: process1001}, + }, + } + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: thread1}, + {Attributes: thread2}, + }, + } + wantFiltered := &want.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread1 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread2 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value + + // During the time range (T0, T1]: + // pid = 1001, tid = 1, #PF = 50 + // pid = 1001, tid = 2, #PF = 30 + observations[thread1] = 50 + observations[thread2] = 30 + + *wantFiltered = 80 + *wantThread1 = 50 + *wantThread2 = 30 + + verify(t) + + // During the time range (T1, T2]: + // pid = 1001, tid = 1, #PF = 53 + // pid = 1001, tid = 2, #PF = 38 + observations[thread1] = 53 + observations[thread2] = 38 + + *wantFiltered = 11 + *wantThread1 = 3 + *wantThread2 = 8 + + verify(t) + + // During the time range (T2, T3] + // pid = 1001, tid = 1, #PF = 56 + // pid = 1001, tid = 2, #PF = 42 + observations[thread1] = 56 + observations[thread2] = 42 + + *wantFiltered = 7 + *wantThread1 = 3 + *wantThread2 = 4 + + verify(t) + + // During the time range (T3, T4]: + // pid = 1001, tid = 1, #PF = 60 + // pid = 1001, tid = 2, #PF = 47 + observations[thread1] = 60 + observations[thread2] = 47 + + *wantFiltered = 9 + *wantThread1 = 4 + *wantThread2 = 5 + + verify(t) + + // During the time range (T4, T5]: + // thread 1 died, thread 3 started + // pid = 1001, tid = 2, #PF = 53 + // pid = 1001, tid = 3, #PF = 5 + delete(observations, thread1) + observations[thread2] = 53 + observations[thread3] = 5 + + *wantFiltered = -49 + want.Metrics[1].Data = metricdata.Sum[int64]{ + Temporality: temporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + // Thread 1 remains at last measured value. + {Attributes: thread1, Value: 0}, + {Attributes: thread2, Value: 6}, + {Attributes: thread3, Value: 5}, + }, + } + + verify(t) + }) +} + var ( aiCounter instrument.Int64ObservableCounter aiUpDownCounter instrument.Int64ObservableUpDownCounter From f2954944004547350dc71bbc6b482e4e0aa54868 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 13 Jan 2023 11:17:36 -0800 Subject: [PATCH 12/15] Reduce duplicate code in TestAsynchronousExample --- sdk/metric/meter_test.go | 75 +++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 44 deletions(-) diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 6e85ff85d78..cfe8accd945 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -1059,7 +1059,7 @@ func TestAsynchronousExample(t *testing.T) { process1001 = attribute.NewSet(processID1001) ) - setup := func(t *testing.T, temp metricdata.Temporality) (map[attribute.Set]int64, func(*testing.T), *metricdata.ScopeMetrics) { + setup := func(t *testing.T, temp metricdata.Temporality) (map[attribute.Set]int64, func(*testing.T), *metricdata.ScopeMetrics, *int64, *int64, *int64) { t.Helper() const ( @@ -1091,9 +1091,34 @@ func TestAsynchronousExample(t *testing.T) { require.NoError(t, err) want := &metricdata.ScopeMetrics{ - Scope: instrumentation.Scope{Name: scopeName}, - Metrics: []metricdata.Metrics{{Name: filteredStream}, {Name: instName}}, + Scope: instrumentation.Scope{Name: scopeName}, + Metrics: []metricdata.Metrics{ + { + Name: filteredStream, + Data: metricdata.Sum[int64]{ + Temporality: temp, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: process1001}, + }, + }, + }, + { + Name: instName, + Data: metricdata.Sum[int64]{ + Temporality: temp, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: thread1}, + {Attributes: thread2}, + }, + }, + }, + }, } + wantFiltered := &want.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread1 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value + wantThread2 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value collect := func(t *testing.T) { t.Helper() @@ -1103,31 +1128,12 @@ func TestAsynchronousExample(t *testing.T) { metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) } - return observations, collect, want + return observations, collect, want, wantFiltered, wantThread1, wantThread2 } t.Run("Cumulative", func(t *testing.T) { temporality := metricdata.CumulativeTemporality - observations, verify, want := setup(t, temporality) - - want.Metrics[0].Data = metricdata.Sum[int64]{ - Temporality: temporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - {Attributes: process1001}, - }, - } - want.Metrics[1].Data = metricdata.Sum[int64]{ - Temporality: temporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - {Attributes: thread1}, - {Attributes: thread2}, - }, - } - wantFiltered := &want.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantThread1 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantThread2 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value + observations, verify, want, wantFiltered, wantThread1, wantThread2 := setup(t, temporality) // During the time range (T0, T1]: // pid = 1001, tid = 1, #PF = 50 @@ -1202,26 +1208,7 @@ func TestAsynchronousExample(t *testing.T) { t.Run("Delta", func(t *testing.T) { temporality := metricdata.DeltaTemporality - observations, verify, want := setup(t, temporality) - - want.Metrics[0].Data = metricdata.Sum[int64]{ - Temporality: temporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - {Attributes: process1001}, - }, - } - want.Metrics[1].Data = metricdata.Sum[int64]{ - Temporality: temporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - {Attributes: thread1}, - {Attributes: thread2}, - }, - } - wantFiltered := &want.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantThread1 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value - wantThread2 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value + observations, verify, want, wantFiltered, wantThread1, wantThread2 := setup(t, temporality) // During the time range (T0, T1]: // pid = 1001, tid = 1, #PF = 50 From 38cd7f4ea4e7d7a6a649a4c962fd42535d433431 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 13 Jan 2023 16:57:09 -0800 Subject: [PATCH 13/15] Clarify naming and documentation --- sdk/metric/internal/aggregator.go | 23 +++++++++- sdk/metric/internal/filter.go | 51 +++++++++++++-------- sdk/metric/internal/filter_test.go | 2 +- sdk/metric/internal/sum.go | 72 +++++++++++++++++++++++------- sdk/metric/internal/sum_test.go | 24 +++++----- 5 files changed, 122 insertions(+), 50 deletions(-) diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 952e9a4a8bd..87b8dd0d357 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -27,8 +27,8 @@ var now = time.Now // Aggregator forms an aggregation from a collection of recorded measurements. // -// Aggregators need to be comparable so they can be de-duplicated by the SDK when -// it creates them for multiple views. +// Aggregators need to be comparable so they can be de-duplicated by the SDK +// when it creates them for multiple views. type Aggregator[N int64 | float64] interface { // Aggregate records the measurement, scoped by attr, and aggregates it // into an aggregation. @@ -38,3 +38,22 @@ type Aggregator[N int64 | float64] interface { // measurements made and ends an aggregation cycle. Aggregation() metricdata.Aggregation } + +// precomputeAggregator is an Aggregator that recieves values to aggregate that +// have been pre-computed by the caller. +type precomputeAggregator[N int64 | float64] interface { + // The Aggregate method of the embedded Aggregator is used to record + // pre-computed measurements, scoped by attributes that have not been + // filtered by an attribute filter. + Aggregator[N] + + // aggregateFiltered records measurements scoped by attributes that have + // been filtered by an attribute filter. + // + // Pre-computed measurements of filtered attributes need to be recorded + // separate from those that haven't been filtered so they can be added to + // the non-filtered pre-computed measurements in a collection cycle and + // then resest after the cycle (the non-filtered pre-computed measurements + // are not reset). + aggregateFiltered(N, attribute.Set) +} diff --git a/sdk/metric/internal/filter.go b/sdk/metric/internal/filter.go index a08c61e3d82..4d24b62819a 100644 --- a/sdk/metric/internal/filter.go +++ b/sdk/metric/internal/filter.go @@ -21,26 +21,26 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -type filterAgg[N int64 | float64] interface { - Aggregator[N] - - // filtered records values for attributes that have been filtered. - filtered(N, attribute.Set) -} - -// NewFilter wraps an Aggregator with an attribute filtering function. +// NewFilter returns an Aggregator that wraps an agg with an attribute +// filtering function. Both pre-computed non-pre-computed Aggregators can be +// passed for agg. An appropriate Aggregator will be returned for the detected +// type. func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] { if fn == nil { return agg } - if fa, ok := agg.(filterAgg[N]); ok { + if fa, ok := agg.(precomputeAggregator[N]); ok { return newPrecomputedFilter(fa, fn) } return newFilter(agg, fn) } -// filter is an aggregator that applies attribute filter when Aggregating. filters -// do not have any backing memory, and must be constructed with a backing Aggregator. +// filter wraps an aggregator with an attribute filter. All recorded +// measurements will have their attributes filtered before they are passed to +// the underlying aggregator's Aggregate method. +// +// This should not be used to wrap a pre-computed Aggregator. Use a +// precomputedFilter instead. type filter[N int64 | float64] struct { filter attribute.Filter aggregator Aggregator[N] @@ -49,6 +49,11 @@ type filter[N int64 | float64] struct { seen map[attribute.Set]attribute.Set } +// newFilter returns an filter Aggregator that wraps agg with the attribute +// filter fn. +// +// This should not be used to wrap a pre-computed Aggregator. Use a +// precomputedFilter instead. func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] { return &filter[N]{ filter: fn, @@ -78,25 +83,33 @@ func (f *filter[N]) Aggregation() metricdata.Aggregation { } // precomputedFilter is an aggregator that applies attribute filter when -// Aggregating for precomputed Aggregations. The precomputed Aggregations need -// to operate normally when no attribute filtering is done (for sums this means -// setting the value), but when attribute filtering is done it needs to be -// added to any set value. +// Aggregating for pre-computed Aggregations. The pre-computed Aggregations +// need to operate normally when no attribute filtering is done (for sums this +// means setting the value), but when attribute filtering is done it needs to +// be added to any set value. type precomputedFilter[N int64 | float64] struct { filter attribute.Filter - aggregator filterAgg[N] + aggregator precomputeAggregator[N] sync.Mutex seen map[attribute.Set]attribute.Set } -func newPrecomputedFilter[N int64 | float64](agg filterAgg[N], fn attribute.Filter) *precomputedFilter[N] { +// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg +// with the attribute filter fn. +// +// This should not be used to wrap a non-pre-computed Aggregator. Use a +// precomputedFilter instead. +func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] { return &precomputedFilter[N]{ filter: fn, aggregator: agg, seen: make(map[attribute.Set]attribute.Set), } } + +// Aggregate records the measurement, scoped by attr, and aggregates it +// into an aggregation. func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) { // TODO (#3006): drop stale attributes from seen. f.Lock() @@ -110,10 +123,12 @@ func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) { // No filtering done. f.aggregator.Aggregate(measurement, fAttr) } else { - f.aggregator.filtered(measurement, fAttr) + f.aggregator.aggregateFiltered(measurement, fAttr) } } +// Aggregation returns an Aggregation, for all the aggregated +// measurements made and ends an aggregation cycle. func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation { return f.aggregator.Aggregation() } diff --git a/sdk/metric/internal/filter_test.go b/sdk/metric/internal/filter_test.go index 1fd8de240d7..e1333c1d4d1 100644 --- a/sdk/metric/internal/filter_test.go +++ b/sdk/metric/internal/filter_test.go @@ -273,7 +273,7 @@ func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) { } // nolint: unused // Used to agg filtered. -func (a *testFilterAgg[N]) filtered(val N, attr attribute.Set) { +func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) { v := a.values[attr] v.filtered += val a.values[attr] = v diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 2fff3ba00d8..7783dc66490 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -158,14 +158,17 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { return out } +// precomputedValue is the recorded measurement value for a set of attributes. type precomputedValue[N int64 | float64] struct { - // measured is the value directly measured. + // measured is the last value measured for a set of attributes that were + // not filtered. measured N - // filtered is the sum of values from spatially aggregations. + // filtered is the sum of values from measurements that had their + // attributes filtered. filtered N } -// valueMap is the storage for precomputed sums. +// precomputedMap is the storage for precomputed sums. type precomputedMap[N int64 | float64] struct { sync.Mutex values map[attribute.Set]precomputedValue[N] @@ -177,7 +180,14 @@ func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] { } } -// Aggregate records value as a cumulative sum for attr. +// Aggregate records value with the unfiltered attributes attr. +// +// If a previous measurement was made for the same attribute set: +// +// - If that measurement's attributes were not filtered, this value overwrite +// that value. +// - If that measurement's attributes were filtered, this value will be +// recorded along side that value. func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) { s.Lock() v := s.values[attr] @@ -186,8 +196,18 @@ func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) { s.Unlock() } -// filtered records value with spatially re-aggregated attrs. -func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered. +// aggregateFiltered records value with the filtered attributes attr. +// +// If a previous measurement was made for the same attribute set: +// +// - If that measurement's attributes were not filtered, this value will be +// recorded along side that value. +// - If that measurement's attributes were filtered, this value will be +// added to it. +// +// This method should not be used if attr have not been reduced by an attribute +// filter. +func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered. s.Lock() v := s.values[attr] v.filtered += value @@ -196,15 +216,14 @@ func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: u } // NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of -// measurements as their pre-computed arithmetic sum. Each sum is scoped by -// attributes and the aggregation cycle the measurements were made in. +// pre-computed sums. Each sum is scoped by attributes and the aggregation +// cycle the measurements were made in. // // The monotonic value is used to communicate the produced Aggregation is // monotonic or not. The returned Aggregator does not make any guarantees this // value is accurate. It is up to the caller to ensure it. // -// The output Aggregation will report recorded values as delta temporality. It -// is up to the caller to ensure this is accurate. +// The output Aggregation will report recorded values as delta temporality. func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { return &precomputedDeltaSum[N]{ precomputedMap: newPrecomputedMap[N](), @@ -214,8 +233,8 @@ func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { } } -// precomputedDeltaSum summarizes a set of measurements recorded over all -// aggregation cycles as the delta arithmetic sum. +// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all +// aggregation cycles as the delta of these sums. type precomputedDeltaSum[N int64 | float64] struct { *precomputedMap[N] @@ -225,6 +244,16 @@ type precomputedDeltaSum[N int64 | float64] struct { start time.Time } +// Aggregation returns the recorded pre-computed sums as an Aggregation. The +// sum values are expressed as the delta between what was measured this +// collection cycle and the previous. +// +// All pre-computed sums that were recorded for attributes sets reduced by an +// attribute filter (filtered-sums) are summed together and added to any +// pre-computed sum value recorded directly for the resulting attribute set +// (unfiltered-sum). The filtered-sums are reset to zero for the next +// collection cycle, and the unfiltered-sum is kept for the next collection +// cycle. func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() @@ -264,15 +293,15 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { } // NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of -// measurements as their pre-computed arithmetic sum. Each sum is scoped by -// attributes and the aggregation cycle the measurements were made in. +// pre-computed sums. Each sum is scoped by attributes and the aggregation +// cycle the measurements were made in. // // The monotonic value is used to communicate the produced Aggregation is // monotonic or not. The returned Aggregator does not make any guarantees this // value is accurate. It is up to the caller to ensure it. // // The output Aggregation will report recorded values as cumulative -// temporality. It is up to the caller to ensure this is accurate. +// temporality. func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { return &precomputedCumulativeSum[N]{ precomputedMap: newPrecomputedMap[N](), @@ -281,8 +310,7 @@ func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N } } -// precomputedCumulativeSum summarizes a set of measurements recorded over all -// aggregation cycles directly as the cumulative arithmetic sum. +// precomputedCumulativeSum directly records and reports a set of pre-computed sums. type precomputedCumulativeSum[N int64 | float64] struct { *precomputedMap[N] @@ -290,6 +318,16 @@ type precomputedCumulativeSum[N int64 | float64] struct { start time.Time } +// Aggregation returns the recorded pre-computed sums as an Aggregation. The +// sum values are expressed directly as they are assumed to be recorded as the +// cumulative sum of a some measured phenomena. +// +// All pre-computed sums that were recorded for attributes sets reduced by an +// attribute filter (filtered-sums) are summed together and added to any +// pre-computed sum value recorded directly for the resulting attribute set +// (unfiltered-sum). The filtered-sums are reset to zero for the next +// collection cycle, and the unfiltered-sum is kept for the next collection +// cycle. func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { s.Lock() defer s.Unlock() diff --git a/sdk/metric/internal/sum_test.go b/sdk/metric/internal/sum_test.go index b3e18d764da..cde79aaa92b 100644 --- a/sdk/metric/internal/sum_test.go +++ b/sdk/metric/internal/sum_test.go @@ -167,7 +167,7 @@ func TestDeltaSumReset(t *testing.T) { func TestPreComputedDeltaSum(t *testing.T) { var mono bool agg := NewPrecomputedDeltaSum[int64](mono) - require.Implements(t, (*filterAgg[int64])(nil), agg) + require.Implements(t, (*precomputeAggregator[int64])(nil), agg) attrs := attribute.NewSet(attribute.String("key", "val")) agg.Aggregate(1, attrs) @@ -185,7 +185,7 @@ func TestPreComputedDeltaSum(t *testing.T) { want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) - agg.(filterAgg[int64]).filtered(1, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) got = agg.Aggregation() // measured(+): 1, previous(-): 1, filtered(+): 1 want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} @@ -205,8 +205,8 @@ func TestPreComputedDeltaSum(t *testing.T) { agg.Aggregate(2, attrs) agg.Aggregate(5, attrs) // Filtered should add. - agg.(filterAgg[int64]).filtered(3, attrs) - agg.(filterAgg[int64]).filtered(10, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() // measured(+): 5, previous(-): 1, filtered(+): 13 want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)} @@ -221,9 +221,9 @@ func TestPreComputedDeltaSum(t *testing.T) { // Order should not affect measure. // Filtered should add. - agg.(filterAgg[int64]).filtered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) agg.Aggregate(7, attrs) - agg.(filterAgg[int64]).filtered(10, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() // measured(+): 7, previous(-): 5, filtered(+): 13 want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)} @@ -238,7 +238,7 @@ func TestPreComputedDeltaSum(t *testing.T) { func TestPreComputedCumulativeSum(t *testing.T) { var mono bool agg := NewPrecomputedCumulativeSum[int64](mono) - require.Implements(t, (*filterAgg[int64])(nil), agg) + require.Implements(t, (*precomputeAggregator[int64])(nil), agg) attrs := attribute.NewSet(attribute.String("key", "val")) agg.Aggregate(1, attrs) @@ -255,7 +255,7 @@ func TestPreComputedCumulativeSum(t *testing.T) { got = agg.Aggregation() metricdatatest.AssertAggregationsEqual(t, want, got, opt) - agg.(filterAgg[int64]).filtered(1, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) got = agg.Aggregation() want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) @@ -268,8 +268,8 @@ func TestPreComputedCumulativeSum(t *testing.T) { // Override set value. agg.Aggregate(5, attrs) // Filtered should add. - agg.(filterAgg[int64]).filtered(3, attrs) - agg.(filterAgg[int64]).filtered(10, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) @@ -281,9 +281,9 @@ func TestPreComputedCumulativeSum(t *testing.T) { // Order should not affect measure. // Filtered should add. - agg.(filterAgg[int64]).filtered(3, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) agg.Aggregate(7, attrs) - agg.(filterAgg[int64]).filtered(10, attrs) + agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) got = agg.Aggregation() want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)} metricdatatest.AssertAggregationsEqual(t, want, got, opt) From 8ec850ab35d040f268eb119e7345ada15cd6541d Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Sat, 14 Jan 2023 07:45:53 -0800 Subject: [PATCH 14/15] Fix spelling errors --- sdk/metric/internal/aggregator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 87b8dd0d357..42694d87020 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -22,7 +22,7 @@ import ( ) // now is used to return the current local time while allowing tests to -// override the the default time.Now function. +// override the default time.Now function. var now = time.Now // Aggregator forms an aggregation from a collection of recorded measurements. @@ -39,7 +39,7 @@ type Aggregator[N int64 | float64] interface { Aggregation() metricdata.Aggregation } -// precomputeAggregator is an Aggregator that recieves values to aggregate that +// precomputeAggregator is an Aggregator that receives values to aggregate that // have been pre-computed by the caller. type precomputeAggregator[N int64 | float64] interface { // The Aggregate method of the embedded Aggregator is used to record @@ -53,7 +53,7 @@ type precomputeAggregator[N int64 | float64] interface { // Pre-computed measurements of filtered attributes need to be recorded // separate from those that haven't been filtered so they can be added to // the non-filtered pre-computed measurements in a collection cycle and - // then resest after the cycle (the non-filtered pre-computed measurements + // then resets after the cycle (the non-filtered pre-computed measurements // are not reset). aggregateFiltered(N, attribute.Set) } From af451873e7aa903b86428e3d9b2b126ada4f572a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 17 Jan 2023 09:34:43 -0800 Subject: [PATCH 15/15] Add a noop filter to default view --- sdk/metric/meter_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index cfe8accd945..76a0bf8ec10 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -1071,7 +1071,8 @@ func TestAsynchronousExample(t *testing.T) { selector := func(InstrumentKind) metricdata.Temporality { return temp } reader := NewManualReader(WithTemporalitySelector(selector)) - noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName}) + noopFilter := func(kv attribute.KeyValue) bool { return true } + noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: noopFilter}) filter := func(kv attribute.KeyValue) bool { return kv.Key != "tid" } filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter})