Skip to content

Commit

Permalink
No memory leakage in attributes filter (#3695)
Browse files Browse the repository at this point in the history
The attributes filter collects seen attributes in order to avoid
filtration on the same attribute set. However, the `attribute.Set` is
not comparable type and new allocations of sets with same attributes will be
considered as new sets.

Metrics with a high cardinality of attributes consume a lot of memory
even if we set a filter to reduce that cardinality.

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
3 people committed Feb 13, 2023
1 parent 68aa598 commit 441a173
Showing 1 changed file with 2 additions and 26 deletions.
28 changes: 2 additions & 26 deletions sdk/metric/internal/filter.go
Expand Up @@ -15,8 +15,6 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
Expand Down Expand Up @@ -44,9 +42,6 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg
type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator Aggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

// newFilter returns an filter Aggregator that wraps agg with the attribute
Expand All @@ -58,21 +53,13 @@ func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filte
return &filter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
fAttr, _ := attr.Filter(f.filter)
f.aggregator.Aggregate(measurement, fAttr)
}

Expand All @@ -90,9 +77,6 @@ func (f *filter[N]) Aggregation() metricdata.Aggregation {
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator precomputeAggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
Expand All @@ -104,21 +88,13 @@ func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn att
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
fAttr, _ := attr.Filter(f.filter)
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
Expand Down

0 comments on commit 441a173

Please sign in to comment.