Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Meter provider don't work with multiple readers #3720

Closed
songjiayang opened this issue Feb 13, 2023 · 8 comments · Fixed by #3724
Closed

Meter provider don't work with multiple readers #3720

songjiayang opened this issue Feb 13, 2023 · 8 comments · Fixed by #3724
Assignees
Labels
area:metrics Part of OpenTelemetry Metrics bug Something isn't working

Comments

@songjiayang
Copy link

songjiayang commented Feb 13, 2023

Description

I upgrade metric SDK from v0.32.0 to v0.36.0, find the MeterProvider doesn't support multiple readers register, it only push data to frist one.

Environment

  • OS: [linux]
  • Architecture: [x86]
  • Go Version: [go1.18.6]
  • opentelemetry-go version: [v0.36.0]

Steps To Reproduce

  1. add stdout exporter to example/prometheus/main.go#L50, code like:
stdExporter, _ := stdoutmetric.New()
exporter, _ := prometheus.New()
provider := metric.NewMeterProvider(
  metric.WithReader(
	  metric.NewPeriodicReader(stdExporter, metric.WithInterval(8*time.Second)),
  ),
  metric.WithReader(exporter),
)

ps: stdout metric exporter is the first reader.

  1. after running example code, we will find only stdout logs output the metrics, curl localhost:2223/metrics | grep baz_bucket is empty.

Expected behavior

Stdout and Prometheus http response both include the metrics.

@songjiayang songjiayang added the bug Something isn't working label Feb 13, 2023
@MrAlias
Copy link
Contributor

MrAlias commented Feb 13, 2023

Verified with this modified version of the prometheus example:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"net/http"
	"os"
	"os/signal"
	"time"

	"github.com/prometheus/client_golang/prometheus/promhttp"

	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/prometheus"
	"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
	api "go.opentelemetry.io/otel/metric"
	"go.opentelemetry.io/otel/metric/instrument"
	"go.opentelemetry.io/otel/sdk/metric"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

func main() {
	ctx := context.Background()

	// The exporter embeds a default OpenTelemetry Reader and
	// implements prometheus.Collector, allowing it to be used as
	// both a Reader and Collector.
	exporter, err := prometheus.New()
	if err != nil {
		log.Fatal(err)
	}
	stdExporter, err := stdoutmetric.New()
	if err != nil {
		log.Fatal(err)
	}
	provider := metric.NewMeterProvider(
		metric.WithReader(
			metric.NewPeriodicReader(stdExporter, metric.WithInterval(8*time.Second)),
		),
		metric.WithReader(exporter),
	)
	meter := provider.Meter("github.com/open-telemetry/opentelemetry-go/example/prometheus")

	// Start the prometheus HTTP server and pass the exporter Collector to it
	go serveMetrics()

	attrs := []attribute.KeyValue{
		attribute.Key("A").String("B"),
		attribute.Key("C").String("D"),
	}

	// This is the equivalent of prometheus.NewCounterVec
	counter, err := meter.Float64Counter("foo", instrument.WithDescription("a simple counter"))
	if err != nil {
		log.Fatal(err)
	}
	counter.Add(ctx, 5, attrs...)

	gauge, err := meter.Float64ObservableGauge("bar", instrument.WithDescription("a fun little gauge"))
	if err != nil {
		log.Fatal(err)
	}
	_, err = meter.RegisterCallback(func(_ context.Context, o api.Observer) error {
		n := -10. + rand.Float64()*(90.) // [-10, 100)
		o.ObserveFloat64(gauge, n, attrs...)
		return nil
	}, gauge)
	if err != nil {
		log.Fatal(err)
	}

	// This is the equivalent of prometheus.NewHistogramVec
	histogram, err := meter.Float64Histogram("baz", instrument.WithDescription("a very nice histogram"))
	if err != nil {
		log.Fatal(err)
	}
	histogram.Record(ctx, 23, attrs...)
	histogram.Record(ctx, 7, attrs...)
	histogram.Record(ctx, 101, attrs...)
	histogram.Record(ctx, 105, attrs...)

	ctx, _ = signal.NotifyContext(ctx, os.Interrupt)
	<-ctx.Done()
}

func serveMetrics() {
	log.Printf("serving metrics at localhost:2223/metrics")
	http.Handle("/metrics", promhttp.Handler())
	err := http.ListenAndServe(":2223", nil)
	if err != nil {
		fmt.Printf("error serving http: %v", err)
		return
	}
}

Running that code and querying http://localhost:2223/metrics returns Prometheus system metrics, but not the OTel metrics defined. The stdout exporter correctly outputs the messages instead.

Switching the order of the registered readers reverses the behavior: the Prometheus endpoint exposes the metrics, but not the stdout exporter.

@MrAlias MrAlias added the area:metrics Part of OpenTelemetry Metrics label Feb 13, 2023
@MrAlias MrAlias added this to the Metric v0.37.0 milestone Feb 13, 2023
@MrAlias
Copy link
Contributor

MrAlias commented Feb 13, 2023

go.mod:

module go.opentelemetry.io/otel/example/prometheus

go 1.18

require (
	github.com/prometheus/client_golang v1.14.0
	go.opentelemetry.io/otel v1.13.0
	go.opentelemetry.io/otel/exporters/prometheus v0.36.0
	go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.36.0
	go.opentelemetry.io/otel/metric v0.36.0
	go.opentelemetry.io/otel/sdk/metric v0.36.0
)

require (
	github.com/beorn7/perks v1.0.1 // indirect
	github.com/cespare/xxhash/v2 v2.1.2 // indirect
	github.com/go-logr/logr v1.2.3 // indirect
	github.com/go-logr/stdr v1.2.2 // indirect
	github.com/golang/protobuf v1.5.2 // indirect
	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
	github.com/prometheus/client_model v0.3.0 // indirect
	github.com/prometheus/common v0.37.0 // indirect
	github.com/prometheus/procfs v0.8.0 // indirect
	go.opentelemetry.io/otel/sdk v1.13.0 // indirect
	go.opentelemetry.io/otel/trace v1.13.0 // indirect
	golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
	google.golang.org/protobuf v1.28.1 // indirect
)

replace go.opentelemetry.io/otel => ../..

replace go.opentelemetry.io/otel/exporters/prometheus => ../../exporters/prometheus

replace go.opentelemetry.io/otel/sdk => ../../sdk

replace go.opentelemetry.io/otel/sdk/metric => ../../sdk/metric

replace go.opentelemetry.io/otel/metric => ../../metric

replace go.opentelemetry.io/otel/trace => ../../trace

@MrAlias
Copy link
Contributor

MrAlias commented Feb 13, 2023

This looks to not be unique to a Prometheus and stdout exporter. Adding another reader for the stdout exporter produces blank metrics.

	// ...
	provider := metric.NewMeterProvider(
		metric.WithReader(
			metric.NewPeriodicReader(stdExporter, metric.WithInterval(8*time.Second)),
		),
		metric.WithReader(
			metric.NewPeriodicReader(stdExporter, metric.WithInterval(8*time.Second)),
		),
		metric.WithReader(exporter),
	)
	// ...
{
	"Resource": [
		{
			"Key": "service.name",
			"Value": {
				"Type": "STRING",
				"Value": "unknown_service:prometheus"
			}
		},
		{
			"Key": "telemetry.sdk.language",
			"Value": {
				"Type": "STRING",
				"Value": "go"
			}
		},
		{
			"Key": "telemetry.sdk.name",
			"Value": {
				"Type": "STRING",
				"Value": "opentelemetry"
			}
		},
		{
			"Key": "telemetry.sdk.version",
			"Value": {
				"Type": "STRING",
				"Value": "1.13.0"
			}
		}
	],
	"ScopeMetrics": []
}
{
	"Resource": [
		{
			"Key": "service.name",
			"Value": {
				"Type": "STRING",
				"Value": "unknown_service:prometheus"
			}
		},
		{
			"Key": "telemetry.sdk.language",
			"Value": {
				"Type": "STRING",
				"Value": "go"
			}
		},
		{
			"Key": "telemetry.sdk.name",
			"Value": {
				"Type": "STRING",
				"Value": "opentelemetry"
			}
		},
		{
			"Key": "telemetry.sdk.version",
			"Value": {
				"Type": "STRING",
				"Value": "1.13.0"
			}
		}
	],
	"ScopeMetrics": [
		{
			"Scope": {
				"Name": "github.com/open-telemetry/opentelemetry-go/example/prometheus",
				"Version": "",
				"SchemaURL": ""
			},
			"Metrics": [
				{
					"Name": "foo",
					"Description": "a simple counter",
					"Unit": "",
					"Data": {
						"DataPoints": [
							{
								"Attributes": [
									{
										"Key": "A",
										"Value": {
											"Type": "STRING",
											"Value": "B"
										}
									},
									{
										"Key": "C",
										"Value": {
											"Type": "STRING",
											"Value": "D"
										}
									}
								],
								"StartTime": "2023-02-13T09:29:16.595389195-08:00",
								"Time": "2023-02-13T09:29:24.600780807-08:00",
								"Value": 15
							}
						],
						"Temporality": "CumulativeTemporality",
						"IsMonotonic": true
					}
				},
				{
					"Name": "bar",
					"Description": "a fun little gauge",
					"Unit": "",
					"Data": {
						"DataPoints": [
							{
								"Attributes": [
									{
										"Key": "A",
										"Value": {
											"Type": "STRING",
											"Value": "B"
										}
									},
									{
										"Key": "C",
										"Value": {
											"Type": "STRING",
											"Value": "D"
										}
									}
								],
								"StartTime": "0001-01-01T00:00:00Z",
								"Time": "2023-02-13T09:29:24.600773983-08:00",
								"Value": 64.90373943201422
							}
						]
					}
				},
				{
					"Name": "baz",
					"Description": "a very nice histogram",
					"Unit": "",
					"Data": {
						"DataPoints": [
							{
								"Attributes": [
									{
										"Key": "A",
										"Value": {
											"Type": "STRING",
											"Value": "B"
										}
									},
									{
										"Key": "C",
										"Value": {
											"Type": "STRING",
											"Value": "D"
										}
									}
								],
								"StartTime": "2023-02-13T09:29:16.595412461-08:00",
								"Time": "2023-02-13T09:29:24.600788031-08:00",
								"Count": 12,
								"Bounds": [
									0,
									5,
									10,
									25,
									50,
									75,
									100,
									250,
									500,
									750,
									1000,
									2500,
									5000,
									7500,
									10000
								],
								"BucketCounts": [
									0,
									0,
									3,
									3,
									0,
									0,
									0,
									6,
									0,
									0,
									0,
									0,
									0,
									0,
									0,
									0
								],
								"Min": {},
								"Max": {},
								"Sum": 708
							}
						],
						"Temporality": "CumulativeTemporality"
					}
				}
			]
		}
	]
}

@MrAlias
Copy link
Contributor

MrAlias commented Feb 13, 2023

This looks to be a bug with instrument caching. The creation of a new aggregator in a cache is linked with the addition of that aggregator to a pipeline:

return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic)
if err != nil {
return nil, err
}
if agg == nil { // Drop aggregator.
return nil, nil
}
if stream.AttributeFilter != nil {
agg = internal.NewFilter(agg, stream.AttributeFilter)
}
i.pipeline.addSync(scope, instrumentSync{
name: stream.Name,
description: stream.Description,
unit: stream.Unit,
aggregator: agg,
})
return agg, err
})

The situation that is happening is the first pipeline for the first reader correctly resolves the aggregator and adds it to the pipeline. The second, and any other, resolve the already cached aggregator. Since it was already cached it is not added to the new pipeline.

@MrAlias
Copy link
Contributor

MrAlias commented Feb 13, 2023

I was able to verify the following change resolves the bug:

	agg, err := i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
		agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic)
		if err != nil {
			return nil, err
		}
		if agg == nil { // Drop aggregator.
			return nil, nil
		}
		if stream.AttributeFilter != nil {
			agg = internal.NewFilter(agg, stream.AttributeFilter)
		}

		return agg, err
	})
	if err == nil {
		i.pipeline.addSync(scope, instrumentSync{
			name:        stream.Name,
			description: stream.Description,
			unit:        stream.Unit,
			aggregator:  agg,
		})
	}
	return agg, err

Though this needs to handle when the same instrument is asked for multiple times. The aggregator should not be added to the pipeline after the first time in that situation. Probably need a cache here.

@MrAlias MrAlias self-assigned this Feb 13, 2023
MrAlias added a commit to MrAlias/opentelemetry-go that referenced this issue Feb 13, 2023
Planning to build into fix for open-telemetry#3720.
@Petrie
Copy link
Contributor

Petrie commented Feb 13, 2023

How about move the instrumentCache construction in to the newResolver() function.
After that, every pipeline in resolver own a independent instrumentCache.

In methodnewMeter(), remove the instrumentCache` construction.

func newMeter(s instrumentation.Scope, p pipelines) *meter {
	// viewCache ensures instrument conflicts, including number conflicts, this
	// meter is asked to create are logged to the user.
	var viewCache cache[string, instrumentID]

	return &meter{
		scope:     s,
		pipes:     p,
		int64IP:   newInstProvider[int64](s, p, &viewCache),
		float64IP: newInstProvider[float64](s, p, &viewCache),
	}
}

Add instrumentCache construction to newResolver.

func newResolver[N int64 | float64](p pipelines, vc *cache[string, instrumentID]) resolver[N] {
	in := make([]*inserter[N], len(p))
	for i := range in {
		c := newInstrumentCache[N](nil, vc)
		in[i] = newInserter(p[i], c)
	}
	return resolver[N]{in}
}

I have already test this change, it works for multi reader.

@MrAlias

This comment was marked as outdated.

MrAlias added a commit to MrAlias/opentelemetry-go that referenced this issue Feb 14, 2023
Planning to build into fix for open-telemetry#3720.
MrAlias added a commit to MrAlias/opentelemetry-go that referenced this issue Feb 14, 2023
Planning to build into fix for open-telemetry#3720.
@MrAlias
Copy link
Contributor

MrAlias commented Feb 14, 2023

That instrument cache needs to be scooped across pipelines. Otherwise multiple aggregators will be used to calculate the same aggregation.

I have a fix that adds an additional cache to the resolver and replaced the instrument provider with the resolvers.

I think I might have misspoke. The instrument cache needs to be scoped that duplicated aggregators are not used to calculate the same aggregation, but I think you were correct is stating that per-reader there should be independent aggregators.

I'm updating #3724

MrAlias added a commit to MrAlias/opentelemetry-go that referenced this issue Feb 15, 2023
The current pipeline resolution path will only add the resolved
aggregators to the pipeline when it creates one (cache miss). It will
not add it if there is a cache hit. This means (since we cache
instruments at the meter level, not the pipeline level) the first reader
in a multiple-reader setup is the only one that will collect data for
that aggregator. All other readers will have a cache hit and nothing is
added to the pipeline. This is causing open-telemetry#3720.

This resolves open-telemetry#3720 by moving the instrument caching into the inserter.
This means aggregators are cached at the reader level, not the meter.
MrAlias added a commit that referenced this issue Feb 21, 2023
* Merge instrument cache to inserter

The current pipeline resolution path will only add the resolved
aggregators to the pipeline when it creates one (cache miss). It will
not add it if there is a cache hit. This means (since we cache
instruments at the meter level, not the pipeline level) the first reader
in a multiple-reader setup is the only one that will collect data for
that aggregator. All other readers will have a cache hit and nothing is
added to the pipeline. This is causing #3720.

This resolves #3720 by moving the instrument caching into the inserter.
This means aggregators are cached at the reader level, not the meter.

* Rename aggCV to aggVal

---------

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:metrics Part of OpenTelemetry Metrics bug Something isn't working
Projects
No open projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants