Skip to content

Commit

Permalink
Remove metric aggregator Subtract interface (#2350)
Browse files Browse the repository at this point in the history
* Remove metric aggregator Subtract interface

* Apply suggestions from code review

Co-authored-by: Georg Pirklbauer <georg.pirklbauer@dynatrace.com>

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* make generate

* update changelog

Co-authored-by: Georg Pirklbauer <georg.pirklbauer@dynatrace.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
3 people committed Nov 15, 2021
1 parent 4077cac commit b8ae272
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 159 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Removed

- Remove the metric Processor's ability to convert cumulative to delta aggregation temporality. (#2350)

## [1.2.0] - 2021-11-12

### Changed
Expand Down
5 changes: 4 additions & 1 deletion sdk/export/metric/aggregation/aggregation.go
Expand Up @@ -125,7 +125,10 @@ var (
ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument")
ErrNaNInput = fmt.Errorf("NaN value is an invalid input")
ErrInconsistentType = fmt.Errorf("inconsistent aggregator types")
ErrNoSubtraction = fmt.Errorf("aggregator does not subtract")

// ErrNoCumulativeToDelta is returned when requesting delta
// export kind for a precomputed sum instrument.
ErrNoCumulativeToDelta = fmt.Errorf("cumulative to delta not implemented")

// ErrNoData is returned when (due to a race with collection)
// the Aggregator is check-pointed before the first value is set.
Expand Down
10 changes: 0 additions & 10 deletions sdk/export/metric/metric.go
Expand Up @@ -193,16 +193,6 @@ type Aggregator interface {
Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error
}

// Subtractor is an optional interface implemented by some
// Aggregators. An Aggregator must support `Subtract()` in order to
// be configured for a Precomputed-Sum instrument (CounterObserver,
// UpDownCounterObserver) using a DeltaExporter.
type Subtractor interface {
// Subtract subtracts the `operand` from this Aggregator and
// outputs the value in `result`.
Subtract(operand, result Aggregator, descriptor *sdkapi.Descriptor) error
}

// Exporter handles presentation of the checkpoint of aggregate
// metrics. This is the final stage of a metrics export pipeline,
// where metric data are formatted for a specific system.
Expand Down
17 changes: 0 additions & 17 deletions sdk/metric/aggregator/sum/sum.go
Expand Up @@ -32,7 +32,6 @@ type Aggregator struct {
}

var _ export.Aggregator = &Aggregator{}
var _ export.Subtractor = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}

// New returns a new counter aggregator implemented by atomic
Expand Down Expand Up @@ -88,19 +87,3 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error
c.value.AddNumber(desc.NumberKind(), o.value)
return nil
}

func (c *Aggregator) Subtract(opAgg, resAgg export.Aggregator, descriptor *sdkapi.Descriptor) error {
op, _ := opAgg.(*Aggregator)
if op == nil {
return aggregator.NewInconsistentAggregatorError(c, opAgg)
}

res, _ := resAgg.(*Aggregator)
if res == nil {
return aggregator.NewInconsistentAggregatorError(c, resAgg)
}

res.value = c.value
res.value.AddNumber(descriptor.NumberKind(), number.NewNumberSignChange(descriptor.NumberKind(), op.value))
return nil
}
77 changes: 31 additions & 46 deletions sdk/metric/processor/basic/basic.go
Expand Up @@ -76,11 +76,6 @@ type (
// values in a single collection round.
current export.Aggregator

// delta, if non-nil, refers to an Aggregator owned by
// the processor used to compute deltas between
// precomputed sums.
delta export.Aggregator

// cumulative, if non-nil, refers to an Aggregator owned
// by the processor used to store the last cumulative
// value.
Expand All @@ -94,9 +89,6 @@ type (
sync.RWMutex
values map[stateKey]*stateValue

// Note: the timestamp logic currently assumes all
// exports are deltas.

processStart time.Time
intervalStart time.Time
intervalEnd time.Time
Expand Down Expand Up @@ -124,8 +116,8 @@ var ErrInvalidTemporality = fmt.Errorf("invalid aggregation temporality")
// New returns a basic Processor that is also a Checkpointer using the provided
// AggregatorSelector to select Aggregators. The TemporalitySelector
// is consulted to determine the kind(s) of exporter that will consume
// data, so that this Processor can prepare to compute Delta or
// Cumulative Aggregations as needed.
// data, so that this Processor can prepare to compute Cumulative Aggregations
// as needed.
func New(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) *Processor {
return NewFactory(aselector, tselector, opts...).NewCheckpointer().(*Processor)
}
Expand Down Expand Up @@ -191,13 +183,17 @@ func (b *Processor) Process(accum export.Accumulation) error {
}
if stateful {
if desc.InstrumentKind().PrecomputedSum() {
// If we know we need to compute deltas, allocate two aggregators.
b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
} else {
// In this case we are certain not to need a delta, only allocate
// a cumulative aggregator.
b.AggregatorFor(desc, &newValue.cumulative)
// To convert precomputed sums to
// deltas requires two aggregators to
// be allocated, one for the prior
// value and one for the output delta.
// This functionality was removed from
// the basic processor in PR #2350.
return aggregation.ErrNoCumulativeToDelta
}
// In this case allocate one aggregator to
// save the current state.
b.AggregatorFor(desc, &newValue.cumulative)
}
b.state.values[key] = newValue
return nil
Expand Down Expand Up @@ -310,28 +306,15 @@ func (b *Processor) FinishCollection() error {
continue
}

// Update Aggregator state to support exporting either a
// delta or a cumulative aggregation.
var err error
if mkind.PrecomputedSum() {
if currentSubtractor, ok := value.current.(export.Subtractor); ok {
// This line is equivalent to:
// value.delta = currentSubtractor - value.cumulative
err = currentSubtractor.Subtract(value.cumulative, value.delta, key.descriptor)

if err == nil {
err = value.current.SynchronizedMove(value.cumulative, key.descriptor)
}
} else {
err = aggregation.ErrNoSubtraction
}
} else {
// The only kind of aggregators that are not stateless
// are the ones needing delta to cumulative
// conversion. Merge aggregator state in this case.
if !mkind.PrecomputedSum() {
// This line is equivalent to:
// value.cumulative = value.cumulative + value.delta
err = value.cumulative.Merge(value.current, key.descriptor)
}
if err != nil {
return err
// value.cumulative = value.cumulative + value.current
if err := value.cumulative.Merge(value.current, key.descriptor); err != nil {
return err
}
}
}
return nil
Expand All @@ -350,13 +333,8 @@ func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.
var agg aggregation.Aggregation
var start time.Time

// If the processor does not have Config.Memory and it was not updated
// in the prior round, do not visit this value.
if !b.config.Memory && value.updated != (b.finishedCollection-1) {
continue
}

aggTemp := exporter.TemporalityFor(key.descriptor, value.current.Aggregation().Kind())

switch aggTemp {
case aggregation.CumulativeTemporality:
// If stateful, the sum has been computed. If stateless, the
Expand All @@ -372,16 +350,23 @@ func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.
case aggregation.DeltaTemporality:
// Precomputed sums are a special case.
if mkind.PrecomputedSum() {
agg = value.delta.Aggregation()
} else {
agg = value.current.Aggregation()
// This functionality was removed from
// the basic processor in PR #2350.
return aggregation.ErrNoCumulativeToDelta
}
agg = value.current.Aggregation()
start = b.intervalStart

default:
return fmt.Errorf("%v: %w", aggTemp, ErrInvalidTemporality)
}

// If the processor does not have Config.Memory and it was not updated
// in the prior round, do not visit this value.
if !b.config.Memory && value.updated != (b.finishedCollection-1) {
continue
}

if err := f(export.NewRecord(
key.descriptor,
value.labels,
Expand Down

0 comments on commit b8ae272

Please sign in to comment.