Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to latest opentelemtry #173

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmdutil/metrics/otel/otel.go
Expand Up @@ -7,7 +7,8 @@ import (
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/sdk/export/metric"

"go.opentelemetry.io/otel/sdk/metric/export/aggregation"

"github.com/heroku/x/go-kit/metrics"
"github.com/heroku/x/go-kit/metrics/provider/otel"
Expand All @@ -24,7 +25,7 @@ func MustProvider(ctx context.Context, logger logrus.FieldLogger, cfg Config, se
}

client := otel.NewHTTPClient(*cfg.CollectorURL)
expOpts := otlpmetric.WithMetricExportKindSelector(metric.DeltaExportKindSelector())
expOpts := otlpmetric.WithMetricAggregationTemporalitySelector(aggregation.DeltaTemporalitySelector())
exporter := otlpmetric.NewUnstarted(client, expOpts)

attrs := []attribute.KeyValue{}
Expand Down
9 changes: 5 additions & 4 deletions go-kit/metrics/provider/otel/options.go
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"

Expand Down Expand Up @@ -37,7 +38,7 @@ var WithDefaultAggregator = WithExactAggregator
//
// NOTE: simple.NewWithExactDistribution is removed in go.opentelemetry.io/otel/sdk/metric@v0.26.0.
func WithExactAggregator() Option {
return WithAggregator(simple.NewWithExactDistribution())
return WithAggregator(simple.NewWithHistogramDistribution())
}

// WithExplicitHistogramAggregator initializes the Provider with with our custom explicit.NewExplicitHistogramSelector.
Expand Down Expand Up @@ -133,7 +134,7 @@ func WithEndpointExporter(endpoint string) Option {
}

// WithExporter initializes the Provider with an exporter.
func WithExporter(exp exporter) Option {
func WithExporter(exp *otlpmetric.Exporter) Option {
return func(p *Provider) error {
if exp == nil {
return ErrExporterNil
Expand All @@ -153,11 +154,11 @@ func WithCollectPeriod(collectPeriod time.Duration) Option {

// defaultExporter returns a new otlp exporter that uses a gRPC driver.
// A collector agent endpoint (host:port) is required as the addr.
func defaultExporter(addr string) exporter {
func defaultExporter(addr string) *otlpmetric.Exporter {
c := otlpmetricgrpc.NewClient(
otlpmetricgrpc.WithEndpoint(addr),
otlpmetricgrpc.WithInsecure(),
)
eo := otlpmetric.WithMetricExportKindSelector(metric.DeltaExportKindSelector())
eo := otlpmetric.WithMetricAggregationTemporalitySelector(aggregation.DeltaTemporalitySelector())
return otlpmetric.NewUnstarted(c, eo)
}
82 changes: 42 additions & 40 deletions go-kit/metrics/provider/otel/provider.go
Expand Up @@ -10,13 +10,18 @@ import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
metricexport "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
metriccontroller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"

xmetrics "github.com/heroku/x/go-kit/metrics"
Expand Down Expand Up @@ -56,8 +61,8 @@ type Provider struct {
serviceNameResource *resource.Resource
optionCache explicit.OptionCache
selector metricexport.AggregatorSelector
exporter exporter
controller controller
exporter *otlpmetric.Exporter
controller *metriccontroller.Controller

defaultTags []string
prefix string
Expand Down Expand Up @@ -103,27 +108,15 @@ func New(ctx context.Context, serviceName string, opts ...Option) (xmetrics.Prov

// initialize the controller
p.controller = metriccontroller.New(
processor.New(p.selector, p.exporter),
processor.NewFactory(simple.NewWithHistogramDistribution(), p.exporter),
metriccontroller.WithExporter(p.exporter),
metriccontroller.WithResource(p.serviceNameResource),
metriccontroller.WithCollectPeriod(p.collectPeriod),
)
global.SetMeterProvider(p.controller.MeterProvider())

return &p, nil
}

type exporter interface {
Start(ctx context.Context) error
Shutdown(ctx context.Context) error
Export(parent context.Context, resource *resource.Resource, cps metricexport.CheckpointSet) error
ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) metricexport.ExportKind
}
global.SetMeterProvider(p.controller)

type controller interface {
MeterProvider() metric.MeterProvider
Start(ctx context.Context) error
Stop(ctx context.Context) error
return &p, nil
}

// Start starts the provider's controller and exporter.
Expand Down Expand Up @@ -167,7 +160,7 @@ func (p *Provider) Meter(name string) metric.Meter {

// Counter is a counter.
type Counter struct {
metric.Float64Counter
syncfloat64.UpDownCounter
name string
labels []string
attributes []attribute.KeyValue
Expand All @@ -176,7 +169,7 @@ type Counter struct {

// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
c.Float64Counter.Add(c.p.ctx, delta, c.attributes...)
c.UpDownCounter.Add(c.p.ctx, delta, c.attributes...)
}

// With implements metrics.Counter.
Expand All @@ -198,14 +191,17 @@ func (p *Provider) newCounter(name string, labelValues ...string) metrics.Counte
m := p.Meter(name)

if _, ok := p.counters[k]; !ok {
c := metric.Must(m).NewFloat64Counter(name)
c, err := m.SyncFloat64().UpDownCounter(name)
if err != nil {
// TODO: handle error
}

p.counters[k] = &Counter{
Float64Counter: c,
labels: labelValues,
attributes: makeAttributes(labelValues),
p: p,
name: name,
UpDownCounter: c,
labels: labelValues,
attributes: makeAttributes(labelValues),
p: p,
name: name,
}
}

Expand All @@ -214,8 +210,8 @@ func (p *Provider) newCounter(name string, labelValues ...string) metrics.Counte

// Gauge is a gauge.
type Gauge struct {
*generic.Gauge
observer *metric.Float64GaugeObserver
Gauge *generic.Gauge
observer asyncfloat64.Gauge
name string
labels []string
attributes []attribute.KeyValue
Expand All @@ -239,15 +235,18 @@ func (p *Provider) newGauge(name string, labelValues ...string) metrics.Gauge {
if _, ok := p.gauges[k]; !ok {
gg := generic.NewGauge(name)

callback := func(ctx context.Context, result metric.Float64ObserverResult) {
result.Observe(gg.Value(), attributes...)
g, err := m.AsyncFloat64().Gauge(name)
if err != nil {
// TODO: handle error
}

g := metric.Must(m).NewFloat64GaugeObserver(name, callback)
m.RegisterCallback([]instrument.Asynchronous{g}, func(ctx context.Context) {
g.Observe(ctx, gg.Value())
})

p.gauges[k] = &Gauge{
Gauge: gg,
observer: &g,
observer: g,
labels: labelValues,
attributes: attributes,
name: name,
Expand Down Expand Up @@ -276,7 +275,7 @@ func (g *Gauge) Add(delta float64) {

// Histogram is a histogram.
type Histogram struct {
metric.Float64Histogram
syncfloat64.Histogram
name string
labels []string
attributes []attribute.KeyValue
Expand Down Expand Up @@ -307,14 +306,17 @@ func (p *Provider) newHistogram(name string, labelValues ...string) metrics.Hist
m := p.Meter(name)

if _, ok := p.histograms[k]; !ok {
h := metric.Must(m).NewFloat64Histogram(name)
h, err := m.SyncFloat64().Histogram(name)
if err != nil {
// TODO: handle error
}

p.histograms[k] = &Histogram{
Float64Histogram: h,
name: name,
labels: labelValues,
attributes: makeAttributes(labelValues),
p: p,
Histogram: h,
name: name,
labels: labelValues,
attributes: makeAttributes(labelValues),
p: p,
}
}

Expand All @@ -329,7 +331,7 @@ func (h *Histogram) With(labelValues ...string) metrics.Histogram {

// Observe implements metrics.Histogram.
func (h *Histogram) Observe(value float64) {
h.Float64Histogram.Record(h.p.ctx, value, h.attributes...)
h.Histogram.Record(h.p.ctx, value, h.attributes...)
}

// NewCardinalityCounter implements metrics.Provider.
Expand Down
5 changes: 2 additions & 3 deletions go-kit/metrics/provider/otel/selector/explicit/explicit.go
Expand Up @@ -3,9 +3,8 @@ package explicit
import (
"sync"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/sdkapi"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/sdkapi"

"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
Expand Down Expand Up @@ -52,7 +51,7 @@ func (c *selectorCache) Fetch(name string) []histogram.Option {
return c.opts[name]
}

func (s selectorHistogram) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
func (s selectorHistogram) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...*export.Aggregator) {
switch desc.InstrumentKind() {
case sdkapi.GaugeObserverInstrumentKind:
lastValueAggs(aggPtrs)
Expand Down
Expand Up @@ -4,12 +4,12 @@ import (
"reflect"
"testing"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/number"
"go.opentelemetry.io/otel/metric/sdkapi"
"go.opentelemetry.io/otel/metric/unit"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/number"
"go.opentelemetry.io/otel/sdk/metric/sdkapi"
)

var (
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSelector_HistogramConfig(t *testing.T) {

func TestSelector_Histogram(t *testing.T) {
var aggregator export.Aggregator
desc := metric.NewDescriptor("test", sdkapi.HistogramInstrumentKind, number.Float64Kind)
desc := sdkapi.NewDescriptor("test", sdkapi.HistogramInstrumentKind, number.Float64Kind, "testing", unit.Milliseconds)

t.Run("no options", func(t *testing.T) {
selector, _ := NewExplicitHistogramDistribution()
Expand Down
31 changes: 17 additions & 14 deletions go.mod
Expand Up @@ -20,6 +20,7 @@ require (
github.com/google/uuid v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.0 // indirect
github.com/heroku/rollrus v0.2.0
github.com/hydrogen18/memlistener v0.0.0-20141126152155-54553eb933fb
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand All @@ -38,23 +39,25 @@ require (
github.com/spf13/pflag v1.0.1 // indirect
github.com/unrolled/secure v1.0.1
github.com/urfave/cli v1.21.0
go.opencensus.io v0.22.1
go.opentelemetry.io/otel v1.0.0-RC3
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.23.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.23.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.22.0
go.opentelemetry.io/otel/metric v0.23.0
go.opentelemetry.io/otel/sdk v1.0.0-RC3
go.opentelemetry.io/otel/sdk/export/metric v0.23.0
go.opentelemetry.io/otel/sdk/metric v0.23.0
go.opencensus.io v0.22.4
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.30.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.30.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.30.0
go.opentelemetry.io/otel/metric v0.30.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/sdk/export/metric v0.28.0
go.opentelemetry.io/otel/sdk/metric v0.30.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20211102061401-a2f17f7b995c // indirect
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/sys v0.0.0-20220429121018-84afa8d3f7b3 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
gonum.org/v1/gonum v0.0.0-20190502212712-4a2eb0188cbc // indirect
google.golang.org/grpc v1.40.0
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e // indirect
google.golang.org/grpc v1.46.0
google.golang.org/grpc/examples v0.0.0-20210916203835-567da6b86340
google.golang.org/protobuf v1.27.1
google.golang.org/protobuf v1.28.0
gopkg.in/caio/go-tdigest.v2 v2.3.0
gopkg.in/ini.v1 v1.42.0 // indirect
)