Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Aggregation/Temporality selection to the Exporter interface #3260

Merged
merged 29 commits into from Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
80fae64
Add Aggregation/Temporality to Exporter iface
MrAlias Oct 4, 2022
d96beed
Use Exporter selectors in periodic reader
MrAlias Oct 4, 2022
478dc74
Move selector opts to just manual reader
MrAlias Oct 4, 2022
3e799aa
Simplify periodic reader ref to Exporter selectors
MrAlias Oct 4, 2022
1a3c419
Fix the periodic reader tests
MrAlias Oct 4, 2022
fc30414
Add Aggregation/Temporality method to stdoutmetric
MrAlias Oct 4, 2022
b9d0022
Add Temporality/Aggregation to otlpmetric exp
MrAlias Oct 4, 2022
a2af7a1
Add Temporality/Aggregation to http/grpc otlp clients
MrAlias Oct 4, 2022
d218965
Add oconf tests for selector opts
MrAlias Oct 5, 2022
7962e53
Add tests to stdoutmetric for opts
MrAlias Oct 5, 2022
3e981b1
Correct comment subject
MrAlias Oct 5, 2022
9a1ecca
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 6, 2022
40a9694
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 12, 2022
7bf425d
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 17, 2022
b4abd3d
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 18, 2022
4972b9d
Add changes to changelog
MrAlias Oct 18, 2022
780d18f
Fix otest test client
MrAlias Oct 18, 2022
9765332
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 18, 2022
8b1e1cd
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 18, 2022
21c7c5b
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 19, 2022
82aeaa6
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 20, 2022
9650330
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 21, 2022
485a905
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 27, 2022
e562617
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 27, 2022
ae0715f
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 28, 2022
701579c
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 28, 2022
224b4a3
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 31, 2022
7f0c8e8
Merge branch 'main' into exp-agg-tempor
MrAlias Oct 31, 2022
23c85af
Merge branch 'main' into exp-agg-tempor
MrAlias Nov 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -18,6 +18,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `"go.opentelemetry.io/otel/sdk/metric".WithReader` option no longer accepts views to associate with the `Reader`.
Instead, views are now registered directly with the `MeterProvider` via the new `WithView` option.
The views registered with the `MeterProvider` apply to all `Reader`s. (#3387)
- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/sdk/metric".Exporter` interface. (#3260)
- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/exporters/otlp/otlpmetric".Client` interface. (#3260)
- The `WithTemporalitySelector` and `WithAggregationSelector` `ReaderOption`s have been changed to `ManualReaderOption`s in the `go.opentelemetry.io/otel/sdk/metric` package. (#3260)
- The periodic reader in the `go.opentelemetry.io/otel/sdk/metric` package now uses the temporality and aggregation selectors from its configured exporter instead of accepting them as options. (#3260)

### Fixed

Expand Down
9 changes: 9 additions & 0 deletions exporters/otlp/otlpmetric/client.go
Expand Up @@ -17,11 +17,20 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric
import (
"context"

"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

// Client handles the transmission of OTLP data to an OTLP receiving endpoint.
type Client interface {
// Temporality returns the Temporality to use for an instrument kind.
Temporality(view.InstrumentKind) metricdata.Temporality

// Aggregation returns the Aggregation to use for an instrument kind.
Aggregation(view.InstrumentKind) aggregation.Aggregation

// UploadMetrics transmits metric data to an OTLP receiver.
//
// All retry logic must be handled by UploadMetrics alone, the Exporter
Expand Down
34 changes: 32 additions & 2 deletions exporters/otlp/otlpmetric/exporter.go
Expand Up @@ -21,7 +21,9 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

Expand All @@ -34,6 +36,20 @@ type exporter struct {
shutdownOnce sync.Once
}

// Temporality returns the Temporality to use for an instrument kind.
func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Temporality(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Aggregation(k)
}

// Export transforms and transmits metric data to an OTLP receiver.
func (e *exporter) Export(ctx context.Context, rm metricdata.ResourceMetrics) error {
otlpRm, err := transform.ResourceMetrics(rm)
Expand Down Expand Up @@ -68,7 +84,10 @@ func (e *exporter) Shutdown(ctx context.Context) error {
e.shutdownOnce.Do(func() {
e.clientMu.Lock()
client := e.client
e.client = shutdownClient{}
e.client = shutdownClient{
temporalitySelector: client.Temporality,
aggregationSelector: client.Aggregation,
}
e.clientMu.Unlock()
err = client.Shutdown(ctx)
})
Expand All @@ -82,7 +101,10 @@ func New(client Client) metric.Exporter {
return &exporter{client: client}
}

type shutdownClient struct{}
type shutdownClient struct {
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}

func (c shutdownClient) err(ctx context.Context) error {
if err := ctx.Err(); err != nil {
Expand All @@ -91,6 +113,14 @@ func (c shutdownClient) err(ctx context.Context) error {
return errShutdown
}

func (c shutdownClient) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

func (c shutdownClient) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

func (c shutdownClient) UploadMetrics(ctx context.Context, _ *mpb.ResourceMetrics) error {
return c.err(ctx)
}
Expand Down
11 changes: 11 additions & 0 deletions exporters/otlp/otlpmetric/exporter_test.go
Expand Up @@ -21,7 +21,10 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

Expand All @@ -31,6 +34,14 @@ type client struct {
n int
}

func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return metric.DefaultTemporalitySelector(k)
}

func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return metric.DefaultAggregationSelector(k)
}

func (c *client) UploadMetrics(context.Context, *mpb.ResourceMetrics) error {
c.n++
return nil
Expand Down
42 changes: 42 additions & 0 deletions exporters/otlp/otlpmetric/internal/oconf/options.go
Expand Up @@ -27,6 +27,10 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/view"
)

const (
Expand Down Expand Up @@ -57,6 +61,9 @@ type (

// gRPC configurations
GRPCCredentials credentials.TransportCredentials

TemporalitySelector metric.TemporalitySelector
AggregationSelector metric.AggregationSelector
}

Config struct {
Expand All @@ -82,6 +89,9 @@ func NewHTTPConfig(opts ...HTTPOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,

TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
}
Expand All @@ -102,6 +112,9 @@ func NewGRPCConfig(opts ...GRPCOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,

TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
DialOptions: []grpc.DialOption{grpc.WithUserAgent(internal.GetUserAgentHeader())},
Expand Down Expand Up @@ -313,3 +326,32 @@ func WithTimeout(duration time.Duration) GenericOption {
return cfg
})
}

func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Metrics.TemporalitySelector = selector
return cfg
})
}

func WithAggregationSelector(selector metric.AggregationSelector) GenericOption {
// Deep copy and validate before using.
wrapped := func(ik view.InstrumentKind) aggregation.Aggregation {
a := selector(ik)
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = metric.DefaultAggregationSelector(ik)
global.Error(
err, "using default aggregation instead",
"aggregation", a,
"replacement", cpA,
)
}
return cpA
}

return newGenericOption(func(cfg Config) Config {
cfg.Metrics.AggregationSelector = wrapped
return cfg
})
}
43 changes: 43 additions & 0 deletions exporters/otlp/otlpmetric/internal/oconf/options_test.go
Expand Up @@ -23,6 +23,9 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/internal/envconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
)

const (
Expand Down Expand Up @@ -383,6 +386,38 @@ func TestConfigs(t *testing.T) {
assert.Equal(t, c.Metrics.Timeout, 5*time.Second)
},
},

// Temporality Selector Tests
{
name: "WithTemporalitySelector",
opts: []oconf.GenericOption{
oconf.WithTemporalitySelector(deltaSelector),
},
asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) {
// Function value comparisons are disallowed, test non-default
// behavior of a TemporalitySelector here to ensure our "catch
// all" was set.
var undefinedKind view.InstrumentKind
got := c.Metrics.TemporalitySelector
assert.Equal(t, metricdata.DeltaTemporality, got(undefinedKind))
},
},

// Aggregation Selector Tests
{
name: "WithAggregationSelector",
opts: []oconf.GenericOption{
oconf.WithAggregationSelector(dropSelector),
},
asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) {
// Function value comparisons are disallowed, test non-default
// behavior of a AggregationSelector here to ensure our "catch
// all" was set.
var undefinedKind view.InstrumentKind
got := c.Metrics.AggregationSelector
assert.Equal(t, aggregation.Drop{}, got(undefinedKind))
},
},
}

for _, tt := range tests {
Expand All @@ -406,6 +441,14 @@ func TestConfigs(t *testing.T) {
}
}

func dropSelector(view.InstrumentKind) aggregation.Aggregation {
return aggregation.Drop{}
}

func deltaSelector(view.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
}

func asHTTPOptions(opts []oconf.GenericOption) []oconf.HTTPOption {
converted := make([]oconf.HTTPOption, len(opts))
for i, o := range opts {
Expand Down
12 changes: 12 additions & 0 deletions exporters/otlp/otlpmetric/internal/otest/client_test.go
Expand Up @@ -19,6 +19,10 @@ import (
"testing"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
cpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand All @@ -27,6 +31,14 @@ type client struct {
storage *Storage
}

func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return metric.DefaultTemporalitySelector(k)
}

func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return metric.DefaultAggregationSelector(k)
}

func (c *client) Collect() *Storage {
return c.storage
}
Expand Down
19 changes: 19 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Expand Up @@ -28,6 +28,9 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand All @@ -53,6 +56,9 @@ type client struct {
exportTimeout time.Duration
requestFunc retry.RequestFunc

temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector

// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
// Shutdown as the conn should only be closed if we created it. Otherwise,
Expand All @@ -70,6 +76,9 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error
exportTimeout: cfg.Metrics.Timeout,
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
conn: cfg.GRPCConn,

temporalitySelector: cfg.Metrics.TemporalitySelector,
aggregationSelector: cfg.Metrics.AggregationSelector,
}

if len(cfg.Metrics.Headers) > 0 {
Expand All @@ -94,6 +103,16 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error
return c, nil
}

// Temporality returns the Temporality to use for an instrument kind.
func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }

Expand Down
18 changes: 18 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/config.go
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
)

// Option applies a configuration option to the Exporter.
Expand Down Expand Up @@ -236,3 +237,20 @@ func WithTimeout(duration time.Duration) Option {
func WithRetry(settings RetryConfig) Option {
return wrappedOption{oconf.WithRetry(retry.Config(settings))}
}

// WithTemporalitySelector sets the TemporalitySelector the client will use to
// determine the Temporality of an instrument based on its kind. If this option
// is not used, the client will use the DefaultTemporalitySelector from the
// go.opentelemetry.io/otel/sdk/metric package.
func WithTemporalitySelector(selector metric.TemporalitySelector) Option {
return wrappedOption{oconf.WithTemporalitySelector(selector)}
}

// WithAggregationSelector sets the AggregationSelector the client will use to
// determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the reader will use the DefaultAggregationSelector
// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation
// explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector metric.AggregationSelector) Option {
return wrappedOption{oconf.WithAggregationSelector(selector)}
}