Skip to content

Commit

Permalink
Add internal package structure for aggregation (#2954)
Browse files Browse the repository at this point in the history
* Add the aggtor package

* Restrict to Go 1.18

* Add missing build block to view_test.go

* Comment Aggregator iface

* Use Go 1.18 as the default ci version

* Update Aggregator iface from feedback

* Accept hist conf

* Flatten aggtor into just internal

* Add Cycler interface

Separate the duties of aggregation and maintaining state across
aggregation periods.

* Remove build flags for doc.go

* Clarify Cycler documentation

* Remove aggregation fold logic

* Rename Number to Atomic

* Add tests for Atomic impls

* Remove unneeded Atomic implementation

Add back when filling in structures.

* Fix article in Float64 docs

* Remove Atomic

This is an implementation detail.

* Add aggregator_example_test

* Fix hist example

* Add issue numbers to all TODO and FIXME

* Remove zero parameter comment

* Combine the cycler into the aggregators

* Remove the drop aggregator

* Fix lint

* Use attribute.Set instead of ptr to it

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
  • Loading branch information
MrAlias and Aneurysm9 committed Jul 6, 2022
1 parent e7f2894 commit c250227
Show file tree
Hide file tree
Showing 29 changed files with 470 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Expand Up @@ -8,7 +8,7 @@ env:
# Path to where test results will be saved.
TEST_RESULTS: /tmp/test-results
# Default minimum version of Go to support.
DEFAULT_GO_VERSION: 1.17
DEFAULT_GO_VERSION: 1.18
jobs:
lint:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/config.go
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

package metric // import "go.opentelemetry.io/otel/sdk/metric"

Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/config_test.go
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

package metric

Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/export/data.go
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

// TODO: NOTE this is a temporary space, it may be moved following the
// discussion of #2813, or #2841
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/exporter.go
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

package metric // import "go.opentelemetry.io/otel/sdk/metric"

Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/go.mod
@@ -1,6 +1,6 @@
module go.opentelemetry.io/otel/sdk/metric

go 1.17
go 1.18

require (
github.com/go-logr/logr v1.2.3
Expand Down
1 change: 0 additions & 1 deletion sdk/metric/go.sum
Expand Up @@ -6,7 +6,6 @@ github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/instrumentkind.go
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17
//go:build go1.18
// +build go1.18

package metric // import "go.opentelemetry.io/otel/sdk/metric"

Expand Down
61 changes: 61 additions & 0 deletions sdk/metric/internal/aggregation.go
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"go.opentelemetry.io/otel/attribute"
)

// Aggregation is a single data point in a timeseries that summarizes
// measurements made during a time span.
type Aggregation struct {
// TODO(#2968): Replace this with the export.Aggregation type once #2961
// is merged.

// Timestamp defines the time the last measurement was made. If zero, no
// measurements were made for this time span. The time is represented as a
// unix timestamp with nanosecond precision.
Timestamp uint64

// Attributes are the unique dimensions Value describes.
Attributes attribute.Set

// Value is the summarization of the measurements made.
Value value
}

type value interface {
private()
}

// SingleValue summarizes a set of measurements as a single value.
type SingleValue[N int64 | float64] struct {
Value N
}

func (SingleValue[N]) private() {}

// HistogramValue summarizes a set of measurements as a histogram.
type HistogramValue struct {
Bounds []float64
Counts []uint64
Sum float64
Min, Max float64
}

func (HistogramValue) private() {}
31 changes: 31 additions & 0 deletions sdk/metric/internal/aggregator.go
@@ -0,0 +1,31 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import "go.opentelemetry.io/otel/attribute"

// Aggregator forms an aggregation from a collection of recorded measurements.
type Aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
Aggregate(measurement N, attr attribute.Set)

// Aggregations returns a slice of Aggregation, one per each attribute set
// used to scope measurement aggregation, and ends an aggregation cycle.
Aggregations() []Aggregation
}
122 changes: 122 additions & 0 deletions sdk/metric/internal/aggregator_example_test.go
@@ -0,0 +1,122 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal

import (
"context"
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
)

type meter struct {
// When a reader initiates a collection, the meter would collect
// aggregations from each of these functions. In this process they will
// progress the aggregation period of each instrument's aggregator.
aggregationFuncs []func() []Aggregation
}

func (m *meter) SyncInt64() syncint64.InstrumentProvider {
// The same would be done for all the other instrument providers.
return (*syncInt64Provider)(m)
}

type syncInt64Provider meter

func (p *syncInt64Provider) Counter(string, ...instrument.Option) (syncint64.Counter, error) {
// This is an example of how a synchronous int64 provider would create an
// aggregator for a new counter. At this point the provider would
// determine the aggregation and temporality to used based on the Reader
// and View configuration. Assume here these are determined to be a
// cumulative sum.

aggregator := NewCumulativeSum[int64]()
count := inst{aggregateFunc: aggregator.Aggregate}

p.aggregationFuncs = append(p.aggregationFuncs, aggregator.Aggregations)

fmt.Printf("using %T aggregator for counter\n", aggregator)

return count, nil
}

func (p *syncInt64Provider) UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) {
// This is an example of how a synchronous int64 provider would create an
// aggregator for a new up-down counter. At this point the provider would
// determine the aggregation and temporality to used based on the Reader
// and View configuration. Assume here these are determined to be a
// last-value aggregation (the temporality does not affect the produced
// aggregations).

aggregator := NewLastValue[int64]()
upDownCount := inst{aggregateFunc: aggregator.Aggregate}

p.aggregationFuncs = append(p.aggregationFuncs, aggregator.Aggregations)

fmt.Printf("using %T aggregator for up-down counter\n", aggregator)

return upDownCount, nil
}

func (p *syncInt64Provider) Histogram(string, ...instrument.Option) (syncint64.Histogram, error) {
// This is an example of how a synchronous int64 provider would create an
// aggregator for a new histogram. At this point the provider would
// determine the aggregation and temporality to used based on the Reader
// and View configuration. Assume here these are determined to be a delta
// explicit-bucket histogram.

aggregator := NewDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000},
NoMinMax: false,
})
hist := inst{aggregateFunc: aggregator.Aggregate}

p.aggregationFuncs = append(p.aggregationFuncs, aggregator.Aggregations)

fmt.Printf("using %T aggregator for histogram\n", aggregator)

return hist, nil
}

// inst is a generalized int64 synchronous counter, up-down counter, and
// histogram used for demonstration purposes only.
type inst struct {
instrument.Synchronous

aggregateFunc func(int64, attribute.Set)
}

func (inst) Add(context.Context, int64, ...attribute.KeyValue) {}
func (inst) Record(context.Context, int64, ...attribute.KeyValue) {}

func Example() {
m := meter{}
provider := m.SyncInt64()

_, _ = provider.Counter("counter example")
_, _ = provider.UpDownCounter("up-down counter example")
_, _ = provider.Histogram("histogram example")

// Output:
// using *internal.cumulativeSum[int64] aggregator for counter
// using *internal.lastValue[int64] aggregator for up-down counter
// using *internal.deltaHistogram[int64] aggregator for histogram
}
18 changes: 18 additions & 0 deletions sdk/metric/internal/doc.go
@@ -0,0 +1,18 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package internal provides types and functionality used to aggregate and
// cycle the state of metric measurements made by the SDK. These types and
// functionality are meant only for internal SDK use.
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
80 changes: 80 additions & 0 deletions sdk/metric/internal/histogram.go
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
)

// histogram summarizes a set of measurements as an histogram with
// explicitly defined buckets.
type histogram[N int64 | float64] struct {
// TODO(#2970): implement.
}

func (s *histogram[N]) Aggregate(value N, attr attribute.Set) {
// TODO(#2970): implement.
}

// NewDeltaHistogram returns an Aggregator that summarizes a set of
// measurements as an histogram. Each histogram is scoped by attributes and
// the aggregation cycle the measurements were made in.
//
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregations method is called it will reset all histogram
// counts to zero.
func NewDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] {
return &deltaHistogram[N]{}
}

// deltaHistogram summarizes a set of measurements made in a single
// aggregation cycle as an histogram with explicitly defined buckets.
type deltaHistogram[N int64 | float64] struct {
histogram[N]

// TODO(#2970): implement.
}

func (s *deltaHistogram[N]) Aggregations() []Aggregation {
// TODO(#2970): implement.
return nil
}

// NewCumulativeHistogram returns an Aggregator that summarizes a set of
// measurements as an histogram. Each histogram is scoped by attributes.
//
// Each aggregation cycle builds from the previous, the histogram counts are
// the bucketed counts of all values aggregated since the returned Aggregator
// was created.
func NewCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] {
return &cumulativeHistogram[N]{}
}

// cumulativeHistogram summarizes a set of measurements made over all
// aggregation cycles as an histogram with explicitly defined buckets.
type cumulativeHistogram[N int64 | float64] struct {
histogram[N]

// TODO(#2970): implement.
}

func (s *cumulativeHistogram[N]) Aggregations() []Aggregation {
// TODO(#2970): implement.
return nil
}

0 comments on commit c250227

Please sign in to comment.