Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove metric aggregator Subtract interface #2350

Merged
merged 9 commits into from Nov 15, 2021
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -23,6 +23,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Added a new `schema` module to help parse Schema Files in OTEP 0152 format. (#2267)
- Added a new `MapCarrier` to the `go.opentelemetry.io/otel/propagation` package to hold propagated coss-cutting concerns as a `map[string]string` held in memory. (#2334)

### Removed

- Remove the metric Processor's ability to convert cumulative to delta aggregation temporality.
jmacd marked this conversation as resolved.
Show resolved Hide resolved

## [1.1.0] - 2021-10-27

### Added
Expand Down
2 changes: 1 addition & 1 deletion attribute/type_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion metric/number/kind_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion metric/sdkapi/instrumentkind_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion sdk/export/metric/aggregation/aggregation.go
Expand Up @@ -142,7 +142,10 @@ var (
ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument")
ErrNaNInput = fmt.Errorf("NaN value is an invalid input")
ErrInconsistentType = fmt.Errorf("inconsistent aggregator types")
ErrNoSubtraction = fmt.Errorf("aggregator does not subtract")

// ErrNoCumulativeToDelta is returned when requesting delta
// export kind for a precomputed sum instrument.
ErrNoCumulativeToDelta = fmt.Errorf("cumulative to delta not implemented")

// ErrNoData is returned when (due to a race with collection)
// the Aggregator is check-pointed before the first value is set.
Expand Down
2 changes: 1 addition & 1 deletion sdk/export/metric/aggregation/temporality_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 0 additions & 10 deletions sdk/export/metric/metric.go
Expand Up @@ -193,16 +193,6 @@ type Aggregator interface {
Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error
}

// Subtractor is an optional interface implemented by some
// Aggregators. An Aggregator must support `Subtract()` in order to
// be configured for a Precomputed-Sum instrument (CounterObserver,
// UpDownCounterObserver) using a DeltaExporter.
type Subtractor interface {
// Subtract subtracts the `operand` from this Aggregator and
// outputs the value in `result`.
Subtract(operand, result Aggregator, descriptor *sdkapi.Descriptor) error
}

// Exporter handles presentation of the checkpoint of aggregate
// metrics. This is the final stage of a metrics export pipeline,
// where metric data are formatted for a specific system.
Expand Down
17 changes: 0 additions & 17 deletions sdk/metric/aggregator/sum/sum.go
Expand Up @@ -32,7 +32,6 @@ type Aggregator struct {
}

var _ export.Aggregator = &Aggregator{}
var _ export.Subtractor = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}

// New returns a new counter aggregator implemented by atomic
Expand Down Expand Up @@ -88,19 +87,3 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error
c.value.AddNumber(desc.NumberKind(), o.value)
return nil
}

func (c *Aggregator) Subtract(opAgg, resAgg export.Aggregator, descriptor *sdkapi.Descriptor) error {
op, _ := opAgg.(*Aggregator)
if op == nil {
return aggregator.NewInconsistentAggregatorError(c, opAgg)
}

res, _ := resAgg.(*Aggregator)
if res == nil {
return aggregator.NewInconsistentAggregatorError(c, resAgg)
}

res.value = c.value
res.value.AddNumber(descriptor.NumberKind(), number.NewNumberSignChange(descriptor.NumberKind(), op.value))
return nil
}
77 changes: 31 additions & 46 deletions sdk/metric/processor/basic/basic.go
Expand Up @@ -76,11 +76,6 @@ type (
// values in a single collection round.
current export.Aggregator

// delta, if non-nil, refers to an Aggregator owned by
// the processor used to compute deltas between
// precomputed sums.
delta export.Aggregator

// cumulative, if non-nil, refers to an Aggregator owned
// by the processor used to store the last cumulative
// value.
Expand All @@ -94,9 +89,6 @@ type (
sync.RWMutex
values map[stateKey]*stateValue

// Note: the timestamp logic currently assumes all
// exports are deltas.

processStart time.Time
intervalStart time.Time
intervalEnd time.Time
Expand Down Expand Up @@ -124,8 +116,8 @@ var ErrInvalidTemporality = fmt.Errorf("invalid aggregation temporality")
// New returns a basic Processor that is also a Checkpointer using the provided
// AggregatorSelector to select Aggregators. The TemporalitySelector
// is consulted to determine the kind(s) of exporter that will consume
// data, so that this Processor can prepare to compute Delta or
// Cumulative Aggregations as needed.
// data, so that this Processor can prepare to compute Cumulative Aggregations
// as needed.
func New(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) *Processor {
return NewFactory(aselector, tselector, opts...).NewCheckpointer().(*Processor)
}
Expand Down Expand Up @@ -191,13 +183,17 @@ func (b *Processor) Process(accum export.Accumulation) error {
}
if stateful {
if desc.InstrumentKind().PrecomputedSum() {
// If we know we need to compute deltas, allocate two aggregators.
b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
} else {
// In this case we are certain not to need a delta, only allocate
// a cumulative aggregator.
b.AggregatorFor(desc, &newValue.cumulative)
// To convert precomputed sums to
// deltas requires two aggregators to
// be allocated, one for the prior
// value and one for the output delta.
// This functionality was removed from
// the basic processor in PR #xxxx.
jmacd marked this conversation as resolved.
Show resolved Hide resolved
return aggregation.ErrNoCumulativeToDelta
}
// In this case allocate one aggregator to
// save the current state.
b.AggregatorFor(desc, &newValue.cumulative)
}
b.state.values[key] = newValue
return nil
Expand Down Expand Up @@ -310,28 +306,15 @@ func (b *Processor) FinishCollection() error {
continue
}

// Update Aggregator state to support exporting either a
// delta or a cumulative aggregation.
var err error
if mkind.PrecomputedSum() {
if currentSubtractor, ok := value.current.(export.Subtractor); ok {
// This line is equivalent to:
// value.delta = currentSubtractor - value.cumulative
err = currentSubtractor.Subtract(value.cumulative, value.delta, key.descriptor)

if err == nil {
err = value.current.SynchronizedMove(value.cumulative, key.descriptor)
}
} else {
err = aggregation.ErrNoSubtraction
}
} else {
// The only kind of aggregators that are not stateless
// are the ones needing delta to cumulative
// conversion. Merge aggregator state in this case.
if !mkind.PrecomputedSum() {
// This line is equivalent to:
// value.cumulative = value.cumulative + value.delta
err = value.cumulative.Merge(value.current, key.descriptor)
}
if err != nil {
return err
// value.cumulative = value.cumulative + value.current
if err := value.cumulative.Merge(value.current, key.descriptor); err != nil {
return err
}
}
}
return nil
Expand All @@ -350,13 +333,8 @@ func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.
var agg aggregation.Aggregation
var start time.Time

// If the processor does not have Config.Memory and it was not updated
// in the prior round, do not visit this value.
if !b.config.Memory && value.updated != (b.finishedCollection-1) {
continue
}

aggTemp := exporter.TemporalityFor(key.descriptor, value.current.Aggregation().Kind())

switch aggTemp {
case aggregation.CumulativeTemporality:
// If stateful, the sum has been computed. If stateless, the
Expand All @@ -372,16 +350,23 @@ func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.
case aggregation.DeltaTemporality:
// Precomputed sums are a special case.
if mkind.PrecomputedSum() {
agg = value.delta.Aggregation()
} else {
agg = value.current.Aggregation()
// This functionality was removed from
// the basic processor in PR #xxxx.
jmacd marked this conversation as resolved.
Show resolved Hide resolved
return aggregation.ErrNoCumulativeToDelta
}
agg = value.current.Aggregation()
start = b.intervalStart

default:
return fmt.Errorf("%v: %w", aggTemp, ErrInvalidTemporality)
}

// If the processor does not have Config.Memory and it was not updated
// in the prior round, do not visit this value.
if !b.config.Memory && value.updated != (b.finishedCollection-1) {
continue
}

if err := f(export.NewRecord(
key.descriptor,
value.labels,
Expand Down