diff --git a/CHANGELOG.md b/CHANGELOG.md index 43c083fba4d..69e781dc659 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `"go.opentelemetry.io/otel/sdk/metric".WithReader` option no longer accepts views to associate with the `Reader`. Instead, views are now registered directly with the `MeterProvider` via the new `WithView` option. The views registered with the `MeterProvider` apply to all `Reader`s. (#3387) +- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/sdk/metric".Exporter` interface. (#3260) +- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/exporters/otlp/otlpmetric".Client` interface. (#3260) +- The `WithTemporalitySelector` and `WithAggregationSelector` `ReaderOption`s have been changed to `ManualReaderOption`s in the `go.opentelemetry.io/otel/sdk/metric` package. (#3260) +- The periodic reader in the `go.opentelemetry.io/otel/sdk/metric` package now uses the temporality and aggregation selectors from its configured exporter instead of accepting them as options. (#3260) ### Fixed diff --git a/exporters/otlp/otlpmetric/client.go b/exporters/otlp/otlpmetric/client.go index 9bcbab44f48..0e522fa939a 100644 --- a/exporters/otlp/otlpmetric/client.go +++ b/exporters/otlp/otlpmetric/client.go @@ -17,11 +17,20 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric import ( "context" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) // Client handles the transmission of OTLP data to an OTLP receiving endpoint. type Client interface { + // Temporality returns the Temporality to use for an instrument kind. + Temporality(view.InstrumentKind) metricdata.Temporality + + // Aggregation returns the Aggregation to use for an instrument kind. + Aggregation(view.InstrumentKind) aggregation.Aggregation + // UploadMetrics transmits metric data to an OTLP receiver. // // All retry logic must be handled by UploadMetrics alone, the Exporter diff --git a/exporters/otlp/otlpmetric/exporter.go b/exporters/otlp/otlpmetric/exporter.go index 2967a5f7b24..296f500d411 100644 --- a/exporters/otlp/otlpmetric/exporter.go +++ b/exporters/otlp/otlpmetric/exporter.go @@ -21,7 +21,9 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -34,6 +36,20 @@ type exporter struct { shutdownOnce sync.Once } +// Temporality returns the Temporality to use for an instrument kind. +func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality { + e.clientMu.Lock() + defer e.clientMu.Unlock() + return e.client.Temporality(k) +} + +// Aggregation returns the Aggregation to use for an instrument kind. +func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + e.clientMu.Lock() + defer e.clientMu.Unlock() + return e.client.Aggregation(k) +} + // Export transforms and transmits metric data to an OTLP receiver. func (e *exporter) Export(ctx context.Context, rm metricdata.ResourceMetrics) error { otlpRm, err := transform.ResourceMetrics(rm) @@ -68,7 +84,10 @@ func (e *exporter) Shutdown(ctx context.Context) error { e.shutdownOnce.Do(func() { e.clientMu.Lock() client := e.client - e.client = shutdownClient{} + e.client = shutdownClient{ + temporalitySelector: client.Temporality, + aggregationSelector: client.Aggregation, + } e.clientMu.Unlock() err = client.Shutdown(ctx) }) @@ -82,7 +101,10 @@ func New(client Client) metric.Exporter { return &exporter{client: client} } -type shutdownClient struct{} +type shutdownClient struct { + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector +} func (c shutdownClient) err(ctx context.Context) error { if err := ctx.Err(); err != nil { @@ -91,6 +113,14 @@ func (c shutdownClient) err(ctx context.Context) error { return errShutdown } +func (c shutdownClient) Temporality(k view.InstrumentKind) metricdata.Temporality { + return c.temporalitySelector(k) +} + +func (c shutdownClient) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return c.aggregationSelector(k) +} + func (c shutdownClient) UploadMetrics(ctx context.Context, _ *mpb.ResourceMetrics) error { return c.err(ctx) } diff --git a/exporters/otlp/otlpmetric/exporter_test.go b/exporters/otlp/otlpmetric/exporter_test.go index fd644140d37..972d0cbf8fe 100644 --- a/exporters/otlp/otlpmetric/exporter_test.go +++ b/exporters/otlp/otlpmetric/exporter_test.go @@ -21,7 +21,10 @@ import ( "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -31,6 +34,14 @@ type client struct { n int } +func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { + return metric.DefaultTemporalitySelector(k) +} + +func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return metric.DefaultAggregationSelector(k) +} + func (c *client) UploadMetrics(context.Context, *mpb.ResourceMetrics) error { c.n++ return nil diff --git a/exporters/otlp/otlpmetric/internal/oconf/options.go b/exporters/otlp/otlpmetric/internal/oconf/options.go index f7d44071442..cf5da7e40f9 100644 --- a/exporters/otlp/otlpmetric/internal/oconf/options.go +++ b/exporters/otlp/otlpmetric/internal/oconf/options.go @@ -27,6 +27,10 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/view" ) const ( @@ -57,6 +61,9 @@ type ( // gRPC configurations GRPCCredentials credentials.TransportCredentials + + TemporalitySelector metric.TemporalitySelector + AggregationSelector metric.AggregationSelector } Config struct { @@ -82,6 +89,9 @@ func NewHTTPConfig(opts ...HTTPOption) Config { URLPath: DefaultMetricsPath, Compression: NoCompression, Timeout: DefaultTimeout, + + TemporalitySelector: metric.DefaultTemporalitySelector, + AggregationSelector: metric.DefaultAggregationSelector, }, RetryConfig: retry.DefaultConfig, } @@ -102,6 +112,9 @@ func NewGRPCConfig(opts ...GRPCOption) Config { URLPath: DefaultMetricsPath, Compression: NoCompression, Timeout: DefaultTimeout, + + TemporalitySelector: metric.DefaultTemporalitySelector, + AggregationSelector: metric.DefaultAggregationSelector, }, RetryConfig: retry.DefaultConfig, DialOptions: []grpc.DialOption{grpc.WithUserAgent(internal.GetUserAgentHeader())}, @@ -313,3 +326,32 @@ func WithTimeout(duration time.Duration) GenericOption { return cfg }) } + +func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Metrics.TemporalitySelector = selector + return cfg + }) +} + +func WithAggregationSelector(selector metric.AggregationSelector) GenericOption { + // Deep copy and validate before using. + wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { + a := selector(ik) + cpA := a.Copy() + if err := cpA.Err(); err != nil { + cpA = metric.DefaultAggregationSelector(ik) + global.Error( + err, "using default aggregation instead", + "aggregation", a, + "replacement", cpA, + ) + } + return cpA + } + + return newGenericOption(func(cfg Config) Config { + cfg.Metrics.AggregationSelector = wrapped + return cfg + }) +} diff --git a/exporters/otlp/otlpmetric/internal/oconf/options_test.go b/exporters/otlp/otlpmetric/internal/oconf/options_test.go index e436eb5b07e..51dddd09533 100644 --- a/exporters/otlp/otlpmetric/internal/oconf/options_test.go +++ b/exporters/otlp/otlpmetric/internal/oconf/options_test.go @@ -23,6 +23,9 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal/envconfig" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" ) const ( @@ -383,6 +386,38 @@ func TestConfigs(t *testing.T) { assert.Equal(t, c.Metrics.Timeout, 5*time.Second) }, }, + + // Temporality Selector Tests + { + name: "WithTemporalitySelector", + opts: []oconf.GenericOption{ + oconf.WithTemporalitySelector(deltaSelector), + }, + asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) { + // Function value comparisons are disallowed, test non-default + // behavior of a TemporalitySelector here to ensure our "catch + // all" was set. + var undefinedKind view.InstrumentKind + got := c.Metrics.TemporalitySelector + assert.Equal(t, metricdata.DeltaTemporality, got(undefinedKind)) + }, + }, + + // Aggregation Selector Tests + { + name: "WithAggregationSelector", + opts: []oconf.GenericOption{ + oconf.WithAggregationSelector(dropSelector), + }, + asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) { + // Function value comparisons are disallowed, test non-default + // behavior of a AggregationSelector here to ensure our "catch + // all" was set. + var undefinedKind view.InstrumentKind + got := c.Metrics.AggregationSelector + assert.Equal(t, aggregation.Drop{}, got(undefinedKind)) + }, + }, } for _, tt := range tests { @@ -406,6 +441,14 @@ func TestConfigs(t *testing.T) { } } +func dropSelector(view.InstrumentKind) aggregation.Aggregation { + return aggregation.Drop{} +} + +func deltaSelector(view.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality +} + func asHTTPOptions(opts []oconf.GenericOption) []oconf.HTTPOption { converted := make([]oconf.HTTPOption, len(opts)) for i, o := range opts { diff --git a/exporters/otlp/otlpmetric/internal/otest/client_test.go b/exporters/otlp/otlpmetric/internal/otest/client_test.go index 09f98ee809b..e701d10b8db 100644 --- a/exporters/otlp/otlpmetric/internal/otest/client_test.go +++ b/exporters/otlp/otlpmetric/internal/otest/client_test.go @@ -19,6 +19,10 @@ import ( "testing" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" cpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -27,6 +31,14 @@ type client struct { storage *Storage } +func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { + return metric.DefaultTemporalitySelector(k) +} + +func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return metric.DefaultAggregationSelector(k) +} + func (c *client) Collect() *Storage { return c.storage } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index 4c5beb8f384..0f1a0040f61 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -28,6 +28,9 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -53,6 +56,9 @@ type client struct { exportTimeout time.Duration requestFunc retry.RequestFunc + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector + // ourConn keeps track of where conn was created: true if created here in // NewClient, or false if passed with an option. This is important on // Shutdown as the conn should only be closed if we created it. Otherwise, @@ -70,6 +76,9 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error exportTimeout: cfg.Metrics.Timeout, requestFunc: cfg.RetryConfig.RequestFunc(retryable), conn: cfg.GRPCConn, + + temporalitySelector: cfg.Metrics.TemporalitySelector, + aggregationSelector: cfg.Metrics.AggregationSelector, } if len(cfg.Metrics.Headers) > 0 { @@ -94,6 +103,16 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error return c, nil } +// Temporality returns the Temporality to use for an instrument kind. +func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { + return c.temporalitySelector(k) +} + +// Aggregation returns the Aggregation to use for an instrument kind. +func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return c.aggregationSelector(k) +} + // ForceFlush does nothing, the client holds no state. func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go index 5f7a66e559e..3b9539c7187 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/sdk/metric" ) // Option applies a configuration option to the Exporter. @@ -236,3 +237,20 @@ func WithTimeout(duration time.Duration) Option { func WithRetry(settings RetryConfig) Option { return wrappedOption{oconf.WithRetry(retry.Config(settings))} } + +// WithTemporalitySelector sets the TemporalitySelector the client will use to +// determine the Temporality of an instrument based on its kind. If this option +// is not used, the client will use the DefaultTemporalitySelector from the +// go.opentelemetry.io/otel/sdk/metric package. +func WithTemporalitySelector(selector metric.TemporalitySelector) Option { + return wrappedOption{oconf.WithTemporalitySelector(selector)} +} + +// WithAggregationSelector sets the AggregationSelector the client will use to +// determine the aggregation to use for an instrument based on its kind. If +// this option is not used, the reader will use the DefaultAggregationSelector +// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation +// explicitly passed for a view matching an instrument. +func WithAggregationSelector(selector metric.AggregationSelector) Option { + return wrappedOption{oconf.WithAggregationSelector(selector)} +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 0840a2c8fa6..9d1bac13f49 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -34,6 +34,9 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -55,6 +58,9 @@ type client struct { compression Compression requestFunc retry.RequestFunc httpClient *http.Client + + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector } // Keep it in sync with golang's DefaultTransport from net/http! We @@ -116,9 +122,22 @@ func newClient(opts ...Option) (otlpmetric.Client, error) { req: req, requestFunc: cfg.RetryConfig.RequestFunc(evaluate), httpClient: httpClient, + + temporalitySelector: cfg.Metrics.TemporalitySelector, + aggregationSelector: cfg.Metrics.AggregationSelector, }, nil } +// Temporality returns the Temporality to use for an instrument kind. +func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality { + return c.temporalitySelector(k) +} + +// Aggregation returns the Aggregation to use for an instrument kind. +func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return c.aggregationSelector(k) +} + // ForceFlush does nothing, the client holds no state. func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() } diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/config.go b/exporters/otlp/otlpmetric/otlpmetrichttp/config.go index c320c25d406..e9f7c774642 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/config.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/config.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" + "go.opentelemetry.io/otel/sdk/metric" ) // Compression describes the compression used for payloads sent to the @@ -179,3 +180,20 @@ func WithTimeout(duration time.Duration) Option { func WithRetry(rc RetryConfig) Option { return wrappedOption{oconf.WithRetry(retry.Config(rc))} } + +// WithTemporalitySelector sets the TemporalitySelector the client will use to +// determine the Temporality of an instrument based on its kind. If this option +// is not used, the client will use the DefaultTemporalitySelector from the +// go.opentelemetry.io/otel/sdk/metric package. +func WithTemporalitySelector(selector metric.TemporalitySelector) Option { + return wrappedOption{oconf.WithTemporalitySelector(selector)} +} + +// WithAggregationSelector sets the AggregationSelector the client will use to +// determine the aggregation to use for an instrument based on its kind. If +// this option is not used, the reader will use the DefaultAggregationSelector +// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation +// explicitly passed for a view matching an instrument. +func WithAggregationSelector(selector metric.AggregationSelector) Option { + return wrappedOption{oconf.WithAggregationSelector(selector)} +} diff --git a/exporters/stdout/stdoutmetric/config.go b/exporters/stdout/stdoutmetric/config.go index c2f02da1ecf..2b40806c96d 100644 --- a/exporters/stdout/stdoutmetric/config.go +++ b/exporters/stdout/stdoutmetric/config.go @@ -16,11 +16,18 @@ package stdoutmetric // import "go.opentelemetry.io/otel/exporters/stdout/stdout import ( "encoding/json" "os" + + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/view" ) // config contains options for the exporter. type config struct { - encoder *encoderHolder + encoder *encoderHolder + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector } // newConfig creates a validated config configured with options. @@ -36,6 +43,14 @@ func newConfig(options ...Option) config { cfg.encoder = &encoderHolder{encoder: enc} } + if cfg.temporalitySelector == nil { + cfg.temporalitySelector = metric.DefaultTemporalitySelector + } + + if cfg.aggregationSelector == nil { + cfg.aggregationSelector = metric.DefaultAggregationSelector + } + return cfg } @@ -60,3 +75,54 @@ func WithEncoder(encoder Encoder) Option { return c }) } + +// WithTemporalitySelector sets the TemporalitySelector the exporter will use +// to determine the Temporality of an instrument based on its kind. If this +// option is not used, the exporter will use the DefaultTemporalitySelector +// from the go.opentelemetry.io/otel/sdk/metric package. +func WithTemporalitySelector(selector metric.TemporalitySelector) Option { + return temporalitySelectorOption{selector: selector} +} + +type temporalitySelectorOption struct { + selector metric.TemporalitySelector +} + +func (t temporalitySelectorOption) apply(c config) config { + c.temporalitySelector = t.selector + return c +} + +// WithAggregationSelector sets the AggregationSelector the exporter will use +// to determine the aggregation to use for an instrument based on its kind. If +// this option is not used, the exporter will use the +// DefaultAggregationSelector from the go.opentelemetry.io/otel/sdk/metric +// package or the aggregation explicitly passed for a view matching an +// instrument. +func WithAggregationSelector(selector metric.AggregationSelector) Option { + // Deep copy and validate before using. + wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { + a := selector(ik) + cpA := a.Copy() + if err := cpA.Err(); err != nil { + cpA = metric.DefaultAggregationSelector(ik) + global.Error( + err, "using default aggregation instead", + "aggregation", a, + "replacement", cpA, + ) + } + return cpA + } + + return aggregationSelectorOption{selector: wrapped} +} + +type aggregationSelectorOption struct { + selector metric.AggregationSelector +} + +func (t aggregationSelectorOption) apply(c config) config { + c.aggregationSelector = t.selector + return c +} diff --git a/exporters/stdout/stdoutmetric/exporter.go b/exporters/stdout/stdoutmetric/exporter.go index fef517fa833..8a9d55a4979 100644 --- a/exporters/stdout/stdoutmetric/exporter.go +++ b/exporters/stdout/stdoutmetric/exporter.go @@ -20,7 +20,9 @@ import ( "sync/atomic" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" ) // exporter is an OpenTelemetry metric exporter. @@ -28,6 +30,9 @@ type exporter struct { encVal atomic.Value // encoderHolder shutdownOnce sync.Once + + temporalitySelector metric.TemporalitySelector + aggregationSelector metric.AggregationSelector } // New returns a configured metric exporter. @@ -36,11 +41,22 @@ type exporter struct { // encoder with tab indentations that output to STDOUT. func New(options ...Option) (metric.Exporter, error) { cfg := newConfig(options...) - exp := &exporter{} + exp := &exporter{ + temporalitySelector: cfg.temporalitySelector, + aggregationSelector: cfg.aggregationSelector, + } exp.encVal.Store(*cfg.encoder) return exp, nil } +func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality { + return e.temporalitySelector(k) +} + +func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + return e.aggregationSelector(k) +} + func (e *exporter) Export(ctx context.Context, data metricdata.ResourceMetrics) error { select { case <-ctx.Done(): diff --git a/exporters/stdout/stdoutmetric/exporter_test.go b/exporters/stdout/stdoutmetric/exporter_test.go index fa2a05401e8..a7c3abb9bd1 100644 --- a/exporters/stdout/stdoutmetric/exporter_test.go +++ b/exporters/stdout/stdoutmetric/exporter_test.go @@ -25,7 +25,9 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" ) func testEncoderOption() stdoutmetric.Option { @@ -97,3 +99,33 @@ func TestShutdownExporterReturnsShutdownErrorOnExport(t *testing.T) { require.NoError(t, exp.Shutdown(ctx)) assert.EqualError(t, exp.Export(ctx, data), "exporter shutdown") } + +func deltaSelector(view.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality +} + +func TestTemporalitySelector(t *testing.T) { + exp, err := stdoutmetric.New( + testEncoderOption(), + stdoutmetric.WithTemporalitySelector(deltaSelector), + ) + require.NoError(t, err) + + var unknownKind view.InstrumentKind + assert.Equal(t, metricdata.DeltaTemporality, exp.Temporality(unknownKind)) +} + +func dropSelector(view.InstrumentKind) aggregation.Aggregation { + return aggregation.Drop{} +} + +func TestAggregationSelector(t *testing.T) { + exp, err := stdoutmetric.New( + testEncoderOption(), + stdoutmetric.WithAggregationSelector(dropSelector), + ) + require.NoError(t, err) + + var unknownKind view.InstrumentKind + assert.Equal(t, aggregation.Drop{}, exp.Aggregation(unknownKind)) +} diff --git a/sdk/metric/exporter.go b/sdk/metric/exporter.go index 79257db0fc6..687d3eae9a6 100644 --- a/sdk/metric/exporter.go +++ b/sdk/metric/exporter.go @@ -18,7 +18,9 @@ import ( "context" "fmt" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/view" ) // ErrExporterShutdown is returned if Export or Shutdown are called after an @@ -28,6 +30,12 @@ var ErrExporterShutdown = fmt.Errorf("exporter is shutdown") // Exporter handles the delivery of metric data to external receivers. This is // the final component in the metric push pipeline. type Exporter interface { + // Temporality returns the Temporality to use for an instrument kind. + Temporality(view.InstrumentKind) metricdata.Temporality + + // Aggregation returns the Aggregation to use for an instrument kind. + Aggregation(view.InstrumentKind) aggregation.Aggregation + // Export serializes and transmits metric data to a receiver. // // This is called synchronously, there is no concurrency safety diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index ad932be793b..62ccf0f0535 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -129,3 +129,53 @@ func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig { type ManualReaderOption interface { applyManual(manualReaderConfig) manualReaderConfig } + +// WithTemporalitySelector sets the TemporalitySelector a reader will use to +// determine the Temporality of an instrument based on its kind. If this +// option is not used, the reader will use the DefaultTemporalitySelector. +func WithTemporalitySelector(selector TemporalitySelector) ManualReaderOption { + return temporalitySelectorOption{selector: selector} +} + +type temporalitySelectorOption struct { + selector func(instrument view.InstrumentKind) metricdata.Temporality +} + +// applyManual returns a manualReaderConfig with option applied. +func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig { + mrc.temporalitySelector = t.selector + return mrc +} + +// WithAggregationSelector sets the AggregationSelector a reader will use to +// determine the aggregation to use for an instrument based on its kind. If +// this option is not used, the reader will use the DefaultAggregationSelector +// or the aggregation explicitly passed for a view matching an instrument. +func WithAggregationSelector(selector AggregationSelector) ManualReaderOption { + // Deep copy and validate before using. + wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { + a := selector(ik) + cpA := a.Copy() + if err := cpA.Err(); err != nil { + cpA = DefaultAggregationSelector(ik) + global.Error( + err, "using default aggregation instead", + "aggregation", a, + "replacement", cpA, + ) + } + return cpA + } + + return aggregationSelectorOption{selector: wrapped} +} + +type aggregationSelectorOption struct { + selector AggregationSelector +} + +// applyManual returns a manualReaderConfig with option applied. +func (t aggregationSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig { + c.aggregationSelector = t.selector + return c +} diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 3f705086162..3dc7ed2d045 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -36,20 +36,16 @@ const ( // periodicReaderConfig contains configuration options for a PeriodicReader. type periodicReaderConfig struct { - interval time.Duration - timeout time.Duration - temporalitySelector TemporalitySelector - aggregationSelector AggregationSelector + interval time.Duration + timeout time.Duration } // newPeriodicReaderConfig returns a periodicReaderConfig configured with // options. func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig { c := periodicReaderConfig{ - interval: defaultInterval, - timeout: defaultTimeout, - temporalitySelector: DefaultTemporalitySelector, - aggregationSelector: DefaultAggregationSelector, + interval: defaultInterval, + timeout: defaultTimeout, } for _, o := range options { c = o.applyPeriodic(c) @@ -118,9 +114,6 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade flushCh: make(chan chan error), cancel: cancel, done: make(chan struct{}), - - temporalitySelector: conf.temporalitySelector, - aggregationSelector: conf.aggregationSelector, } go func() { @@ -140,9 +133,6 @@ type periodicReader struct { exporter Exporter flushCh chan chan error - temporalitySelector TemporalitySelector - aggregationSelector AggregationSelector - done chan struct{} cancel context.CancelFunc shutdownOnce sync.Once @@ -187,12 +177,12 @@ func (r *periodicReader) register(p producer) { // temporality reports the Temporality for the instrument kind provided. func (r *periodicReader) temporality(kind view.InstrumentKind) metricdata.Temporality { - return r.temporalitySelector(kind) + return r.exporter.Temporality(kind) } // aggregation returns what Aggregation to use for kind. func (r *periodicReader) aggregation(kind view.InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. - return r.aggregationSelector(kind) + return r.exporter.Aggregation(kind) } // collectAndExport gather all metric data related to the periodicReader r from diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 8c3c0599158..c0bf06a3086 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/suite" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/view" ) @@ -54,13 +55,29 @@ func TestWithInterval(t *testing.T) { } type fnExporter struct { - exportFunc func(context.Context, metricdata.ResourceMetrics) error - flushFunc func(context.Context) error - shutdownFunc func(context.Context) error + temporalityFunc TemporalitySelector + aggregationFunc AggregationSelector + exportFunc func(context.Context, metricdata.ResourceMetrics) error + flushFunc func(context.Context) error + shutdownFunc func(context.Context) error } var _ Exporter = (*fnExporter)(nil) +func (e *fnExporter) Temporality(k view.InstrumentKind) metricdata.Temporality { + if e.temporalityFunc != nil { + return e.temporalityFunc(k) + } + return DefaultTemporalitySelector(k) +} + +func (e *fnExporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation { + if e.aggregationFunc != nil { + return e.aggregationFunc(k) + } + return DefaultAggregationSelector(k) +} + func (e *fnExporter) Export(ctx context.Context, m metricdata.ResourceMetrics) error { if e.exportFunc != nil { return e.exportFunc(ctx, m) @@ -230,29 +247,25 @@ func BenchmarkPeriodicReader(b *testing.B) { func TestPeriodiclReaderTemporality(t *testing.T) { tests := []struct { - name string - options []PeriodicReaderOption + name string + exporter *fnExporter // Currently only testing constant temporality. This should be expanded // if we put more advanced selection in the SDK wantTemporality metricdata.Temporality }{ { name: "default", + exporter: new(fnExporter), wantTemporality: metricdata.CumulativeTemporality, }, { - name: "delta", - options: []PeriodicReaderOption{ - WithTemporalitySelector(deltaTemporalitySelector), - }, + name: "delta", + exporter: &fnExporter{temporalityFunc: deltaTemporalitySelector}, wantTemporality: metricdata.DeltaTemporality, }, { - name: "repeats overwrite", - options: []PeriodicReaderOption{ - WithTemporalitySelector(deltaTemporalitySelector), - WithTemporalitySelector(cumulativeTemporalitySelector), - }, + name: "cumulative", + exporter: &fnExporter{temporalityFunc: cumulativeTemporalitySelector}, wantTemporality: metricdata.CumulativeTemporality, }, } @@ -260,8 +273,8 @@ func TestPeriodiclReaderTemporality(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var undefinedInstrument view.InstrumentKind - rdr := NewPeriodicReader(new(fnExporter), tt.options...) - assert.Equal(t, tt.wantTemporality, rdr.temporality(undefinedInstrument)) + rdr := NewPeriodicReader(tt.exporter) + assert.Equal(t, tt.wantTemporality.String(), rdr.temporality(undefinedInstrument).String()) }) } } diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index e5a1282db6b..53c13c4bffe 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -18,7 +18,6 @@ import ( "context" "fmt" - "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/view" @@ -108,13 +107,6 @@ func (p shutdownProducer) produce(context.Context) (metricdata.ResourceMetrics, return metricdata.ResourceMetrics{}, ErrReaderShutdown } -// ReaderOption applies a configuration option value to either a ManualReader or -// a PeriodicReader. -type ReaderOption interface { - ManualReaderOption - PeriodicReaderOption -} - // TemporalitySelector selects the temporality to use based on the InstrumentKind. type TemporalitySelector func(view.InstrumentKind) metricdata.Temporality @@ -125,29 +117,6 @@ func DefaultTemporalitySelector(view.InstrumentKind) metricdata.Temporality { return metricdata.CumulativeTemporality } -// WithTemporalitySelector sets the TemporalitySelector a reader will use to -// determine the Temporality of an instrument based on its kind. If this -// option is not used, the reader will use the DefaultTemporalitySelector. -func WithTemporalitySelector(selector TemporalitySelector) ReaderOption { - return temporalitySelectorOption{selector: selector} -} - -type temporalitySelectorOption struct { - selector func(instrument view.InstrumentKind) metricdata.Temporality -} - -// applyManual returns a manualReaderConfig with option applied. -func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig { - mrc.temporalitySelector = t.selector - return mrc -} - -// applyPeriodic returns a periodicReaderConfig with option applied. -func (t temporalitySelectorOption) applyPeriodic(prc periodicReaderConfig) periodicReaderConfig { - prc.temporalitySelector = t.selector - return prc -} - // AggregationSelector selects the aggregation and the parameters to use for // that aggregation based on the InstrumentKind. type AggregationSelector func(view.InstrumentKind) aggregation.Aggregation @@ -172,42 +141,3 @@ func DefaultAggregationSelector(ik view.InstrumentKind) aggregation.Aggregation } panic("unknown instrument kind") } - -// WithAggregationSelector sets the AggregationSelector a reader will use to -// determine the aggregation to use for an instrument based on its kind. If -// this option is not used, the reader will use the DefaultAggregationSelector -// or the aggregation explicitly passed for a view matching an instrument. -func WithAggregationSelector(selector AggregationSelector) ReaderOption { - // Deep copy and validate before using. - wrapped := func(ik view.InstrumentKind) aggregation.Aggregation { - a := selector(ik) - cpA := a.Copy() - if err := cpA.Err(); err != nil { - cpA = DefaultAggregationSelector(ik) - global.Error( - err, "using default aggregation instead", - "aggregation", a, - "replacement", cpA, - ) - } - return cpA - } - - return aggregationSelectorOption{selector: wrapped} -} - -type aggregationSelectorOption struct { - selector AggregationSelector -} - -// applyManual returns a manualReaderConfig with option applied. -func (t aggregationSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig { - c.aggregationSelector = t.selector - return c -} - -// applyPeriodic returns a periodicReaderConfig with option applied. -func (t aggregationSelectorOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig { - c.aggregationSelector = t.selector - return c -}