Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the UpDownSumObserver instrument #750

Merged
merged 5 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions api/metric/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,28 @@ func TestObserverInstruments(t *testing.T) {
-142,
)
})
t.Run("float updownsumobserver", func(t *testing.T) {
labels := []kv.KeyValue{kv.String("O", "P")}
mockSDK, meter := mockTest.NewMeter()
o := Must(meter).RegisterFloat64UpDownSumObserver("test.updownsumobserver.float", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(42.1, labels...)
})
mockSDK.RunAsyncInstruments()
checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, metric.UpDownSumObserverKind, o.AsyncImpl(),
42.1,
)
})
t.Run("int updownsumobserver", func(t *testing.T) {
labels := []kv.KeyValue{}
mockSDK, meter := mockTest.NewMeter()
o := Must(meter).RegisterInt64UpDownSumObserver("test.observer.int", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(-142, labels...)
})
mockSDK.RunAsyncInstruments()
checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, metric.UpDownSumObserverKind, o.AsyncImpl(),
-142,
)
})
}

func checkSyncBatches(t *testing.T, ctx context.Context, labels []kv.KeyValue, mock *mockTest.MeterImpl, nkind metric.NumberKind, mkind metric.Kind, instrument metric.InstrumentImpl, expected ...float64) {
Expand Down
12 changes: 12 additions & 0 deletions api/metric/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,15 @@ func wrapFloat64SumObserverInstrument(asyncInst AsyncImpl, err error) (Float64Su
common, err := checkNewAsync(asyncInst, err)
return Float64SumObserver{asyncInstrument: common}, err
}

// wrapInt64UpDownSumObserverInstrument converts an AsyncImpl into Int64UpDownSumObserver.
func wrapInt64UpDownSumObserverInstrument(asyncInst AsyncImpl, err error) (Int64UpDownSumObserver, error) {
common, err := checkNewAsync(asyncInst, err)
return Int64UpDownSumObserver{asyncInstrument: common}, err
}

// wrapFloat64UpDownSumObserverInstrument converts an AsyncImpl into Float64UpDownSumObserver.
func wrapFloat64UpDownSumObserverInstrument(asyncInst AsyncImpl, err error) (Float64UpDownSumObserver, error) {
common, err := checkNewAsync(asyncInst, err)
return Float64UpDownSumObserver{asyncInstrument: common}, err
}
2 changes: 2 additions & 0 deletions api/metric/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ const (

// SumObserverKind indicates a SumObserver instrument.
SumObserverKind
// UpDownSumObserverKind indicates a UpDownSumObserver instrument.
UpDownSumObserverKind
)
5 changes: 3 additions & 2 deletions api/metric/kind_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 51 additions & 0 deletions api/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,32 @@ func (m Meter) RegisterFloat64SumObserver(name string, callback Float64ObserverC
newFloat64AsyncRunner(callback)))
}

// RegisterInt64UpDownSumObserver creates a new integer UpDownSumObserver instrument
// with the given name, running a given callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
// or improperly registered (e.g., duplicate registration).
func (m Meter) RegisterInt64UpDownSumObserver(name string, callback Int64ObserverCallback, opts ...Option) (Int64UpDownSumObserver, error) {
if callback == nil {
return wrapInt64UpDownSumObserverInstrument(NoopAsync{}, nil)
}
return wrapInt64UpDownSumObserverInstrument(
m.newAsync(name, UpDownSumObserverKind, Int64NumberKind, opts,
newInt64AsyncRunner(callback)))
}

// RegisterFloat64UpDownSumObserver creates a new floating point UpDownSumObserver with
// the given name, running a given callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
// or improperly registered (e.g., duplicate registration).
func (m Meter) RegisterFloat64UpDownSumObserver(name string, callback Float64ObserverCallback, opts ...Option) (Float64UpDownSumObserver, error) {
if callback == nil {
return wrapFloat64UpDownSumObserverInstrument(NoopAsync{}, nil)
}
return wrapFloat64UpDownSumObserverInstrument(
m.newAsync(name, UpDownSumObserverKind, Float64NumberKind, opts,
newFloat64AsyncRunner(callback)))
}

// RegisterInt64ValueObserver creates a new integer ValueObserver instrument
// with the given name, running in a batch callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
Expand Down Expand Up @@ -220,6 +246,31 @@ func (b BatchObserver) RegisterFloat64SumObserver(name string, opts ...Option) (
b.runner))
}

// RegisterInt64UpDownSumObserver creates a new integer UpDownSumObserver instrument
// with the given name, running in a batch callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
// or improperly registered (e.g., duplicate registration).
func (b BatchObserver) RegisterInt64UpDownSumObserver(name string, opts ...Option) (Int64UpDownSumObserver, error) {
if b.runner == nil {
return wrapInt64UpDownSumObserverInstrument(NoopAsync{}, nil)
}
return wrapInt64UpDownSumObserverInstrument(
b.meter.newAsync(name, UpDownSumObserverKind, Int64NumberKind, opts, b.runner))
}

// RegisterFloat64UpDownSumObserver creates a new floating point UpDownSumObserver with
// the given name, running in a batch callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
// or improperly registered (e.g., duplicate registration).
func (b BatchObserver) RegisterFloat64UpDownSumObserver(name string, opts ...Option) (Float64UpDownSumObserver, error) {
if b.runner == nil {
return wrapFloat64UpDownSumObserverInstrument(NoopAsync{}, nil)
}
return wrapFloat64UpDownSumObserverInstrument(
b.meter.newAsync(name, UpDownSumObserverKind, Float64NumberKind, opts,
b.runner))
}

// MeterImpl returns the underlying MeterImpl of this Meter.
func (m Meter) MeterImpl() MeterImpl {
return m.impl
Expand Down
40 changes: 40 additions & 0 deletions api/metric/must.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,26 @@ func (mm MeterMust) RegisterFloat64SumObserver(name string, callback Float64Obse
}
}

// RegisterInt64UpDownSumObserver calls `Meter.RegisterInt64UpDownSumObserver` and
// returns the instrument, panicking if it encounters an error.
func (mm MeterMust) RegisterInt64UpDownSumObserver(name string, callback Int64ObserverCallback, oos ...Option) Int64UpDownSumObserver {
if inst, err := mm.meter.RegisterInt64UpDownSumObserver(name, callback, oos...); err != nil {
panic(err)
} else {
return inst
}
}

// RegisterFloat64UpDownSumObserver calls `Meter.RegisterFloat64UpDownSumObserver` and
// returns the instrument, panicking if it encounters an error.
func (mm MeterMust) RegisterFloat64UpDownSumObserver(name string, callback Float64ObserverCallback, oos ...Option) Float64UpDownSumObserver {
if inst, err := mm.meter.RegisterFloat64UpDownSumObserver(name, callback, oos...); err != nil {
panic(err)
} else {
return inst
}
}

// NewBatchObserver returns a wrapper around BatchObserver that panics
// when any instrument constructor returns an error.
func (mm MeterMust) NewBatchObserver(callback BatchObserverCallback) BatchObserverMust {
Expand Down Expand Up @@ -180,3 +200,23 @@ func (bm BatchObserverMust) RegisterFloat64SumObserver(name string, oos ...Optio
return inst
}
}

// RegisterInt64UpDownSumObserver calls `BatchObserver.RegisterInt64UpDownSumObserver` and
// returns the instrument, panicking if it encounters an error.
func (bm BatchObserverMust) RegisterInt64UpDownSumObserver(name string, oos ...Option) Int64UpDownSumObserver {
if inst, err := bm.batch.RegisterInt64UpDownSumObserver(name, oos...); err != nil {
panic(err)
} else {
return inst
}
}

// RegisterFloat64UpDownSumObserver calls `BatchObserver.RegisterFloat64UpDownSumObserver` and
// returns the instrument, panicking if it encounters an error.
func (bm BatchObserverMust) RegisterFloat64UpDownSumObserver(name string, oos ...Option) Float64UpDownSumObserver {
if inst, err := bm.batch.RegisterFloat64UpDownSumObserver(name, oos...); err != nil {
panic(err)
} else {
return inst
}
}
34 changes: 34 additions & 0 deletions api/metric/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ type Float64SumObserver struct {
asyncInstrument
}

// Int64UpDownSumObserver is a metric that captures a precomputed sum of
// int64 values at a point in time.
type Int64UpDownSumObserver struct {
asyncInstrument
}

// Float64UpDownSumObserver is a metric that captures a precomputed sum of
// float64 values at a point in time.
type Float64UpDownSumObserver struct {
asyncInstrument
}

// Observation returns an Observation, a BatchObserverCallback
// argument, for an asynchronous integer instrument.
// This returns an implementation-level object for use by the SDK,
Expand Down Expand Up @@ -88,3 +100,25 @@ func (f Float64SumObserver) Observation(v float64) Observation {
instrument: f.instrument,
}
}

// Observation returns an Observation, a BatchObserverCallback
// argument, for an asynchronous integer instrument.
// This returns an implementation-level object for use by the SDK,
// users should not refer to this.
func (i Int64UpDownSumObserver) Observation(v int64) Observation {
return Observation{
number: NewInt64Number(v),
instrument: i.instrument,
}
}

// Observation returns an Observation, a BatchObserverCallback
// argument, for an asynchronous integer instrument.
// This returns an implementation-level object for use by the SDK,
// users should not refer to this.
func (f Float64UpDownSumObserver) Observation(v float64) Observation {
return Observation{
number: NewFloat64Number(v),
instrument: f.instrument,
}
}
91 changes: 79 additions & 12 deletions sdk/metric/correct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,27 +347,45 @@ func TestObserverCollection(t *testing.T) {
result.Observe(1)
})

_ = Must(meter).RegisterFloat64UpDownSumObserver("float.updownsumobserver", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(1, kv.String("A", "B"))
result.Observe(-2, kv.String("A", "B"))
result.Observe(1, kv.String("C", "D"))
})
_ = Must(meter).RegisterInt64UpDownSumObserver("int.updownsumobserver", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(2, kv.String("A", "B"))
result.Observe(1)
// last value wins
result.Observe(1, kv.String("A", "B"))
result.Observe(-1)
})

_ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(_ context.Context, result metric.Int64ObserverResult) {
})

collected := sdk.Collect(ctx)

require.Equal(t, 8, collected)
require.Equal(t, 8, len(integrator.records))
require.Equal(t, collected, len(integrator.records))

out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range integrator.records {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"float.sumobserver/A=B/R=V": 2,
"float.sumobserver/C=D/R=V": 1,
"int.sumobserver//R=V": 1,
"int.sumobserver/A=B/R=V": 1,
"float.valueobserver/A=B/R=V": -1,
"float.valueobserver/C=D/R=V": -1,
"int.valueobserver//R=V": 1,
"int.valueobserver/A=B/R=V": 1,

"float.sumobserver/A=B/R=V": 2,
"float.sumobserver/C=D/R=V": 1,
"int.sumobserver//R=V": 1,
"int.sumobserver/A=B/R=V": 1,

"float.updownsumobserver/A=B/R=V": -2,
"float.updownsumobserver/C=D/R=V": 1,
"int.updownsumobserver//R=V": -1,
"int.updownsumobserver/A=B/R=V": 1,
}, out.Map)
}

Expand Down Expand Up @@ -405,6 +423,8 @@ func TestObserverBatch(t *testing.T) {
var intValueObs metric.Int64ValueObserver
var floatSumObs metric.Float64SumObserver
var intSumObs metric.Int64SumObserver
var floatUpDownSumObs metric.Float64UpDownSumObserver
var intUpDownSumObs metric.Int64UpDownSumObserver

var batch = Must(meter).NewBatchObserver(
func(_ context.Context, result metric.BatchObserverResult) {
Expand All @@ -418,41 +438,52 @@ func TestObserverBatch(t *testing.T) {
intValueObs.Observation(1),
floatSumObs.Observation(1000),
intSumObs.Observation(100),
floatUpDownSumObs.Observation(-1000),
intUpDownSumObs.Observation(-100),
)
result.Observe(
[]kv.KeyValue{
kv.String("C", "D"),
},
floatValueObs.Observation(-1),
floatSumObs.Observation(-1),
floatUpDownSumObs.Observation(-1),
)
result.Observe(
nil,
intValueObs.Observation(1),
intValueObs.Observation(1),
intSumObs.Observation(10),
floatSumObs.Observation(1.1),
intUpDownSumObs.Observation(10),
)
})
floatValueObs = batch.RegisterFloat64ValueObserver("float.valueobserver")
intValueObs = batch.RegisterInt64ValueObserver("int.valueobserver")
floatSumObs = batch.RegisterFloat64SumObserver("float.sumobserver")
intSumObs = batch.RegisterInt64SumObserver("int.sumobserver")
floatUpDownSumObs = batch.RegisterFloat64UpDownSumObserver("float.updownsumobserver")
intUpDownSumObs = batch.RegisterInt64UpDownSumObserver("int.updownsumobserver")

collected := sdk.Collect(ctx)

require.Equal(t, 8, collected)
require.Equal(t, 8, len(integrator.records))
require.Equal(t, collected, len(integrator.records))

out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range integrator.records {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"float.sumobserver//R=V": 1.1,
"float.sumobserver/A=B/R=V": 1000,
"int.sumobserver//R=V": 10,
"int.sumobserver/A=B/R=V": 100,
"float.sumobserver//R=V": 1.1,
"float.sumobserver/A=B/R=V": 1000,
"int.sumobserver//R=V": 10,
"int.sumobserver/A=B/R=V": 100,

"int.updownsumobserver/A=B/R=V": -100,
"float.updownsumobserver/A=B/R=V": -1000,
"int.updownsumobserver//R=V": 10,
"float.updownsumobserver/C=D/R=V": -1,

"float.valueobserver/A=B/R=V": -1,
"float.valueobserver/C=D/R=V": -1,
"int.valueobserver//R=V": 1,
Expand Down Expand Up @@ -515,6 +546,42 @@ func TestRecordPersistence(t *testing.T) {
require.Equal(t, int64(2), integrator.newAggCount)
}

func TestIncorrectInstruments(t *testing.T) {
// The Batch observe/record APIs are susceptible to
// uninitialized instruments.
var counter metric.Int64Counter
var observer metric.Int64ValueObserver

ctx := context.Background()
meter, sdk, integrator := newSDK(t)

// Now try with uninitialized instruments.
meter.RecordBatch(ctx, nil, counter.Measurement(1))
meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) {
result.Observe(nil, observer.Observation(1))
})

collected := sdk.Collect(ctx)
require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr())
require.Equal(t, 0, collected)

// Now try with instruments from another SDK.
var noopMeter metric.Meter
counter = metric.Must(noopMeter).NewInt64Counter("counter")
observer = metric.Must(noopMeter).NewBatchObserver(
func(context.Context, metric.BatchObserverResult) {},
).RegisterInt64ValueObserver("observer")

meter.RecordBatch(ctx, nil, counter.Measurement(1))
meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) {
result.Observe(nil, observer.Observation(1))
})

collected = sdk.Collect(ctx)
require.Equal(t, 0, collected)
require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr())
}

func TestSyncInAsync(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
Expand Down