diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go new file mode 100644 index 00000000000..68779bac37f --- /dev/null +++ b/sdk/metric/instrument.go @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" + "go.opentelemetry.io/otel/metric/instrument/asyncint64" + "go.opentelemetry.io/otel/sdk/metric/internal" +) + +type instrumentImpl[N int64 | float64] struct { + instrument.Asynchronous + + aggregators []internal.Aggregator[N] +} + +var _ asyncfloat64.Counter = &instrumentImpl[float64]{} +var _ asyncfloat64.UpDownCounter = &instrumentImpl[float64]{} +var _ asyncfloat64.Gauge = &instrumentImpl[float64]{} +var _ asyncint64.Counter = &instrumentImpl[int64]{} +var _ asyncint64.UpDownCounter = &instrumentImpl[int64]{} +var _ asyncint64.Gauge = &instrumentImpl[int64]{} + +func (i *instrumentImpl[N]) Observe(ctx context.Context, val N, attrs ...attribute.KeyValue) { + // Only record a value if this is being called from the MetricProvider. + _, ok := ctx.Value(produceKey).(struct{}) + if !ok { + return + } + if err := ctx.Err(); err != nil { + return + } + for _, agg := range i.aggregators { + agg.Aggregate(val, attribute.NewSet(attrs...)) + } +} diff --git a/sdk/metric/instrument_provider.go b/sdk/metric/instrument_provider.go new file mode 100644 index 00000000000..87a8723959f --- /dev/null +++ b/sdk/metric/instrument_provider.go @@ -0,0 +1,151 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "fmt" + + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/asyncfloat64" + "go.opentelemetry.io/otel/metric/instrument/asyncint64" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/view" +) + +type asyncInt64Provider struct { + scope instrumentation.Scope + registry *pipelineRegistry[int64] +} + +var _ asyncint64.InstrumentProvider = asyncInt64Provider{} + +// Counter creates an instrument for recording increasing values. +func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) { + cfg := instrument.NewConfig(opts...) + + aggs, err := p.registry.createAggregators(view.Instrument{ + Scope: p.scope, + Name: name, + Description: cfg.Description(), + Kind: view.AsyncCounter, + }, cfg.Unit()) + if len(aggs) == 0 && err != nil { + err = fmt.Errorf("instrument does not match any view: %w", err) + } + + return &instrumentImpl[int64]{ + aggregators: aggs, + }, err +} + +// UpDownCounter creates an instrument for recording changes of a value. +func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) { + cfg := instrument.NewConfig(opts...) + + aggs, err := p.registry.createAggregators(view.Instrument{ + Scope: p.scope, + Name: name, + Description: cfg.Description(), + Kind: view.AsyncUpDownCounter, + }, cfg.Unit()) + if len(aggs) == 0 && err != nil { + err = fmt.Errorf("instrument does not match any view: %w", err) + } + return &instrumentImpl[int64]{ + aggregators: aggs, + }, err +} + +// Gauge creates an instrument for recording the current value. +func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) { + cfg := instrument.NewConfig(opts...) + + aggs, err := p.registry.createAggregators(view.Instrument{ + Scope: p.scope, + Name: name, + Description: cfg.Description(), + Kind: view.AsyncGauge, + }, cfg.Unit()) + if len(aggs) == 0 && err != nil { + err = fmt.Errorf("instrument does not match any view: %w", err) + } + return &instrumentImpl[int64]{ + aggregators: aggs, + }, err +} + +type asyncFloat64Provider struct { + scope instrumentation.Scope + registry *pipelineRegistry[float64] +} + +var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} + +// Counter creates an instrument for recording increasing values. +func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) { + cfg := instrument.NewConfig(opts...) + + aggs, err := p.registry.createAggregators(view.Instrument{ + Scope: p.scope, + Name: name, + Description: cfg.Description(), + Kind: view.AsyncCounter, + }, cfg.Unit()) + if len(aggs) == 0 && err != nil { + err = fmt.Errorf("instrument does not match any view: %w", err) + } + return &instrumentImpl[float64]{ + aggregators: aggs, + }, err +} + +// UpDownCounter creates an instrument for recording changes of a value. +func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) { + cfg := instrument.NewConfig(opts...) + + aggs, err := p.registry.createAggregators(view.Instrument{ + Scope: p.scope, + Name: name, + Description: cfg.Description(), + Kind: view.AsyncUpDownCounter, + }, cfg.Unit()) + if len(aggs) == 0 && err != nil { + err = fmt.Errorf("instrument does not match any view: %w", err) + } + return &instrumentImpl[float64]{ + aggregators: aggs, + }, err +} + +// Gauge creates an instrument for recording the current value. +func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) { + cfg := instrument.NewConfig(opts...) + + aggs, err := p.registry.createAggregators(view.Instrument{ + Scope: p.scope, + Name: name, + Description: cfg.Description(), + Kind: view.AsyncGauge, + }, cfg.Unit()) + if len(aggs) == 0 && err != nil { + err = fmt.Errorf("instrument does not match any view: %w", err) + } + return &instrumentImpl[float64]{ + aggregators: aggs, + }, err +} diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index c77a4fa54d8..08a1ba16025 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -44,6 +44,9 @@ type meterRegistry struct { sync.Mutex meters map[instrumentation.Scope]*meter + + intRegistry *pipelineRegistry[int64] + floatRegistry *pipelineRegistry[float64] } // Get returns a registered meter matching the instrumentation scope if it @@ -56,7 +59,11 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { defer r.Unlock() if r.meters == nil { - m := &meter{Scope: s} + m := &meter{ + Scope: s, + intRegistry: r.intRegistry, + floatRegistry: r.floatRegistry, + } r.meters = map[instrumentation.Scope]*meter{s: m} return m } @@ -66,7 +73,11 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { return m } - m = &meter{Scope: s} + m = &meter{ + Scope: s, + intRegistry: r.intRegistry, + floatRegistry: r.floatRegistry, + } r.meters[s] = m return m } @@ -93,7 +104,8 @@ func (r *meterRegistry) Range(f func(*meter) bool) { type meter struct { instrumentation.Scope - // TODO (#2815, 2814): implement. + intRegistry *pipelineRegistry[int64] + floatRegistry *pipelineRegistry[float64] } // Compile-time check meter implements metric.Meter. @@ -101,20 +113,19 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - // TODO (#2815): implement. - return nil + return asyncInt64Provider{scope: m.Scope, registry: m.intRegistry} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - // TODO (#2815): implement. - return nil + return asyncFloat64Provider{scope: m.Scope, registry: m.floatRegistry} } // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { - // TODO (#2815): implement. + // Because the pipelines are shared only one of the registries needs to be invoked + m.intRegistry.registerCallback(f) return nil } diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 54ae767b20b..5f5bbc1bfff 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -18,11 +18,18 @@ package metric import ( + "context" + "sync" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) func TestMeterRegistry(t *testing.T) { @@ -64,3 +71,293 @@ func TestMeterRegistry(t *testing.T) { assert.Equal(t, 1, i, "iteration not stopped after first flase return") }) } + +// A meter should be able to make instruments concurrently. +func TestMeterInstrumentConcurrency(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(6) + + m := NewMeterProvider().Meter("inst-concurrency") + + go func() { + _, _ = m.AsyncFloat64().Counter("AFCounter") + wg.Done() + }() + go func() { + _, _ = m.AsyncFloat64().UpDownCounter("AFUpDownCounter") + wg.Done() + }() + go func() { + _, _ = m.AsyncFloat64().Gauge("AFGauge") + wg.Done() + }() + go func() { + _, _ = m.AsyncInt64().Counter("AICounter") + wg.Done() + }() + go func() { + _, _ = m.AsyncInt64().UpDownCounter("AIUpDownCounter") + wg.Done() + }() + go func() { + _, _ = m.AsyncInt64().Gauge("AIGauge") + wg.Done() + }() + + wg.Wait() +} + +// A Meter Should be able register Callbacks Concurrently. +func TestMeterCallbackCreationConcurrency(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(2) + + m := NewMeterProvider().Meter("callback-concurrency") + + go func() { + _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + wg.Done() + }() + go func() { + _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + wg.Done() + }() + wg.Wait() +} + +// Instruments should produce correct ResourceMetrics. +// TODO (2814): include sync instruments. +func TestMeterCreatesInstruments(t *testing.T) { + testCases := []struct { + name string + fn func(*testing.T, metric.Meter) + want metricdata.Metrics + }{ + { + name: "AsyncInt64Count", + fn: func(t *testing.T, m metric.Meter) { + ctr, err := m.AsyncInt64().Counter("aint") + assert.NoError(t, err) + err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 3) + }) + assert.NoError(t, err) + + // Observed outside of a callback, it should be ignored. + ctr.Observe(context.Background(), 19) + }, + want: metricdata.Metrics{ + Name: "aint", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 3}, + }, + }, + }, + }, + { + name: "AsyncInt64UpDownCount", + fn: func(t *testing.T, m metric.Meter) { + ctr, err := m.AsyncInt64().UpDownCounter("aint") + assert.NoError(t, err) + err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 11) + }) + assert.NoError(t, err) + + // Observed outside of a callback, it should be ignored. + ctr.Observe(context.Background(), 19) + }, + want: metricdata.Metrics{ + Name: "aint", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 11}, + }, + }, + }, + }, + { + name: "AsyncInt64Gauge", + fn: func(t *testing.T, m metric.Meter) { + gauge, err := m.AsyncInt64().Gauge("agauge") + assert.NoError(t, err) + err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + gauge.Observe(ctx, 11) + }) + assert.NoError(t, err) + + // Observed outside of a callback, it should be ignored. + gauge.Observe(context.Background(), 19) + }, + want: metricdata.Metrics{ + Name: "agauge", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 11}, + }, + }, + }, + }, + { + name: "AsyncFloat64Count", + fn: func(t *testing.T, m metric.Meter) { + ctr, err := m.AsyncFloat64().Counter("afloat") + assert.NoError(t, err) + err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 3) + }) + assert.NoError(t, err) + + // Observed outside of a callback, it should be ignored. + ctr.Observe(context.Background(), 19) + }, + want: metricdata.Metrics{ + Name: "afloat", + Data: metricdata.Sum[float64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[float64]{ + {Value: 3}, + }, + }, + }, + }, + { + name: "AsyncFloat64UpDownCount", + fn: func(t *testing.T, m metric.Meter) { + ctr, err := m.AsyncFloat64().UpDownCounter("afloat") + assert.NoError(t, err) + err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + ctr.Observe(ctx, 11) + }) + assert.NoError(t, err) + + // Observed outside of a callback, it should be ignored. + ctr.Observe(context.Background(), 19) + }, + want: metricdata.Metrics{ + Name: "afloat", + Data: metricdata.Sum[float64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[float64]{ + {Value: 11}, + }, + }, + }, + }, + { + name: "AsyncFloat64Gauge", + fn: func(t *testing.T, m metric.Meter) { + gauge, err := m.AsyncFloat64().Gauge("agauge") + assert.NoError(t, err) + err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + gauge.Observe(ctx, 11) + }) + assert.NoError(t, err) + + // Observed outside of a callback, it should be ignored. + gauge.Observe(context.Background(), 19) + }, + want: metricdata.Metrics{ + Name: "agauge", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + {Value: 11}, + }, + }, + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + rdr := NewManualReader() + m := NewMeterProvider(WithReader(rdr)).Meter("testInstruments") + + tt.fn(t, m) + + rm, err := rdr.Collect(context.Background()) + assert.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 1) + sm := rm.ScopeMetrics[0] + require.Len(t, sm.Metrics, 1) + got := sm.Metrics[0] + metricdatatest.AssertEqual(t, tt.want, got, metricdatatest.IgnoreTimestamp()) + }) + } +} + +func TestMetersProvideScope(t *testing.T) { + rdr := NewManualReader() + mp := NewMeterProvider(WithReader(rdr)) + + m1 := mp.Meter("scope1") + ctr1, err := m1.AsyncFloat64().Counter("ctr1") + assert.NoError(t, err) + err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) { + ctr1.Observe(ctx, 5) + }) + assert.NoError(t, err) + + m2 := mp.Meter("scope2") + ctr2, err := m2.AsyncInt64().Counter("ctr2") + assert.NoError(t, err) + err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) { + ctr2.Observe(ctx, 7) + }) + assert.NoError(t, err) + + want := metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{ + Name: "scope1", + }, + Metrics: []metricdata.Metrics{ + { + Name: "ctr1", + Data: metricdata.Sum[float64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[float64]{ + { + Value: 5, + }, + }, + }, + }, + }, + }, + { + Scope: instrumentation.Scope{ + Name: "scope2", + }, + Metrics: []metricdata.Metrics{ + { + Name: "ctr2", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 7, + }, + }, + }, + }, + }, + }, + }, + } + + got, err := rdr.Collect(context.Background()) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index d53417ffed4..b673329f897 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -104,6 +104,14 @@ func (p *pipeline) addCallback(callback func(context.Context)) { p.callbacks = append(p.callbacks, callback) } +// callbackKey is a context key type used to identify context that came from the SDK. +type callbackKey int + +// produceKey is the context key to tell if a Observe is called within a callback. +// Its value of zero is arbitrary. If this package defined other context keys, +// they would have different integer values. +const produceKey callbackKey = 0 + // produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. @@ -111,6 +119,8 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err p.Lock() defer p.Unlock() + ctx = context.WithValue(ctx, produceKey, struct{}{}) + for _, callback := range p.callbacks { // TODO make the callbacks parallel. ( #3034 ) callback(ctx) diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 8e9098b6a30..8f978c7d295 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -33,9 +32,7 @@ import ( type MeterProvider struct { res *resource.Resource - meters meterRegistry - readers map[Reader][]view.View - providers map[Reader]*pipeline + meters meterRegistry forceFlush, shutdown func(context.Context) error } @@ -54,17 +51,15 @@ func NewMeterProvider(options ...Option) *MeterProvider { flush, sdown := conf.readerSignals() - providers := make(map[Reader]*pipeline, len(conf.readers)) - for reader := range conf.readers { - pipe := newPipeline(conf.res) - providers[reader] = pipe - reader.register(pipe) - } + intRegistry, floatRegistry := newPipelineRegistries(conf.readers) return &MeterProvider{ - res: conf.res, - readers: conf.readers, - providers: providers, + res: conf.res, + + meters: meterRegistry{ + intRegistry: intRegistry, + floatRegistry: floatRegistry, + }, forceFlush: flush, shutdown: sdown,