Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No memory leakage in attributes filter #3695

Merged
merged 5 commits into from Feb 13, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 2 additions & 26 deletions sdk/metric/internal/filter.go
Expand Up @@ -15,8 +15,6 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
Expand Down Expand Up @@ -44,9 +42,6 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg
type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator Aggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

// newFilter returns an filter Aggregator that wraps agg with the attribute
Expand All @@ -58,21 +53,13 @@ func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filte
return &filter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
fAttr, _ := attr.Filter(f.filter)
f.aggregator.Aggregate(measurement, fAttr)
}

Expand All @@ -90,9 +77,6 @@ func (f *filter[N]) Aggregation() metricdata.Aggregation {
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator precomputeAggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
Expand All @@ -104,21 +88,13 @@ func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn att
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
fAttr, _ := attr.Filter(f.filter)
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
Expand Down