Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Commit

Permalink
Add option to provide resource based on metric descriptor. (#231)
Browse files Browse the repository at this point in the history
* Add option to provide resource based on metric descriptor.

* change option to accomodate removing labels that are used for the resource

* fix doc and review comments.

* fix review comment.
  • Loading branch information
rghetia committed Oct 29, 2019
1 parent 8033da9 commit 9366d36
Show file tree
Hide file tree
Showing 3 changed files with 345 additions and 1 deletion.
17 changes: 16 additions & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/resource"
)
Expand Down Expand Up @@ -154,12 +155,26 @@ func (se *statsExporter) metricToMpbTs(ctx context.Context, metric *metricdata.M
// TODO: (@rghetia) perhaps log this error from labels extraction, if non-nil.
continue
}

var rsc *monitoredrespb.MonitoredResource
var mr monitoredresource.Interface
if se.o.ResourceByDescriptor != nil {
labels, mr = se.o.ResourceByDescriptor(&metric.Descriptor, labels)
// TODO(rghetia): optimize this. It is inefficient to convert this for all metrics.
rsc = convertMonitoredResourceToPB(mr)
if rsc.Type == "" {
rsc.Type = "global"
rsc.Labels = nil
}
} else {
rsc = resource
}
timeSeries = append(timeSeries, &monitoringpb.TimeSeries{
Metric: &googlemetricpb.Metric{
Type: metricType,
Labels: labels,
},
Resource: resource,
Resource: rsc,
Points: sdPoints,
})
}
Expand Down
311 changes: 311 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

"contrib.go.opencensus.io/exporter/stackdriver/monitoredresource"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/resource"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -559,3 +560,313 @@ func TestMetricsToMonitoringMetrics_fromProtoPoint(t *testing.T) {
}
}
}

func TestResourceByDescriptor(t *testing.T) {
startTimestamp := &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000090,
}
startTime, _ := ptypes.Timestamp(startTimestamp)
endTimestamp := &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000997,
}
endTime, _ := ptypes.Timestamp(endTimestamp)

tests := []struct {
in *metricdata.Metric
want []*monitoringpb.CreateTimeSeriesRequest
wantErr string
}{
{
in: &metricdata.Metric{
Descriptor: metricdata.Descriptor{
Name: "custom_resource_one",
Description: "This is a test",
Unit: metricdata.UnitBytes,
Type: metricdata.TypeCumulativeInt64,
LabelKeys: []metricdata.LabelKey{
{
Key: "k11",
},
{
Key: "k12",
},
},
},
Resource: nil,
TimeSeries: []*metricdata.TimeSeries{
{
StartTime: startTime,
Points: []metricdata.Point{
{
Time: endTime,
Value: int64(5),
},
},
LabelValues: []metricdata.LabelValue{
{
Value: "v11",
},
{
Value: "v12",
},
},
},
},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/custom_resource_one",
Labels: map[string]string{
"k12": "v12",
},
},
Resource: &monitoredrespb.MonitoredResource{
Type: "one",
Labels: map[string]string{
"k11": "v11",
},
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: 5,
},
},
},
},
},
},
},
},
},
{
in: &metricdata.Metric{
Descriptor: metricdata.Descriptor{
Name: "custom_resource_two",
Description: "This is a test",
Unit: metricdata.UnitBytes,
Type: metricdata.TypeCumulativeInt64,
LabelKeys: []metricdata.LabelKey{
{
Key: "k21",
},
{
Key: "k22",
},
},
},
Resource: nil,
TimeSeries: []*metricdata.TimeSeries{
{
StartTime: startTime,
Points: []metricdata.Point{
{
Time: endTime,
Value: int64(5),
},
},
LabelValues: []metricdata.LabelValue{
{
Value: "v21",
},
{
Value: "v22",
},
},
},
},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/custom_resource_two",
Labels: map[string]string{
"k21": "v21",
},
},
Resource: &monitoredrespb.MonitoredResource{
Type: "two",
Labels: map[string]string{
"k22": "v22",
},
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: 5,
},
},
},
},
},
},
},
},
},
{
in: &metricdata.Metric{
Descriptor: metricdata.Descriptor{
Name: "custom_resource_other",
Description: "This is a test",
Unit: metricdata.UnitBytes,
Type: metricdata.TypeCumulativeInt64,
LabelKeys: []metricdata.LabelKey{
{
Key: "k31",
},
{
Key: "k32",
},
},
},
Resource: nil,
TimeSeries: []*metricdata.TimeSeries{
{
StartTime: startTime,
Points: []metricdata.Point{
{
Time: endTime,
Value: int64(5),
},
},
LabelValues: []metricdata.LabelValue{
{
Value: "v31",
},
{
Value: "v32",
},
},
},
},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/custom_resource_other",
Labels: map[string]string{
"k31": "v31",
"k32": "v32",
},
},
Resource: &monitoredrespb.MonitoredResource{
Type: "global",
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: 5,
},
},
},
},
},
},
},
},
},
}

var se = &statsExporter{
o: Options{
ProjectID: "foo",
ResourceByDescriptor: getResourceByDescriptor,
},
}

for i, tt := range tests {
tsl, err := se.metricToMpbTs(context.Background(), tt.in)
if tt.wantErr != "" {
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr)
}
continue
}
if err != nil {
t.Errorf("#%d: unexpected error: %v", i, err)
continue
}

got := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
// Our saving grace is serialization equality since some
// unexported fields could be present in the various values.
if diff := cmpTSReqs(got, tt.want); diff != "" {
t.Fatalf("Test %d failed. Unexpected CreateTimeSeriesRequests -got +want: %s", i, diff)
}
}
}

type customResource struct {
rt string
rm map[string]string
}

var _ monitoredresource.Interface = (*customResource)(nil)

func (cr *customResource) MonitoredResource() (resType string, labels map[string]string) {
return cr.rt, cr.rm
}

var crEmpty = &customResource{rt: ""}

func getResourceByDescriptor(md *metricdata.Descriptor, labels map[string]string) (map[string]string, monitoredresource.Interface) {
switch md.Name {
case "custom_resource_one":
cr := &customResource{
rt: "one",
rm: map[string]string{
"k11": labels["k11"],
},
}
newLabels := removeLabel(labels, cr.rm)
return newLabels, cr
case "custom_resource_two":
cr := &customResource{
rt: "two",
rm: map[string]string{
"k22": labels["k22"],
},
}
newLabels := removeLabel(labels, cr.rm)
return newLabels, cr
default:
return labels, crEmpty
}
}

func removeLabel(m map[string]string, remove map[string]string) map[string]string {
newM := make(map[string]string)
for k, v := range m {
if _, ok := remove[k]; !ok {
newM[k] = v
}
}
return newM
}
18 changes: 18 additions & 0 deletions stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,24 @@ type Options struct {
// to Stackdriver Monitoring. This is only used for Proto metrics export
// for now. The minimum number of workers is 1.
NumberOfWorkers int

// ResourceByDescriptor may be provided to supply monitored resource dynamically
// based on the metric Descriptor. Most users will not need to set this,
// but should instead set ResourceDetector.
//
// The MonitoredResource and ResourceDetector fields are ignored if this
// field is set to a non-nil value.
//
// The ResourceByDescriptor is called to derive monitored resources from
// metric.Descriptor and the label map associated with the time-series.
// If any label is used for the derived resource then it will be removed
// from the label map. The remaining labels in the map are returned to
// be used with the time-series.
//
// If the func set to this field does not return valid resource even for one
// time-series then it will result into an error for the entire CreateTimeSeries request
// which may contain more than one time-series.
ResourceByDescriptor func(*metricdata.Descriptor, map[string]string) (map[string]string, monitoredresource.Interface)
}

const defaultTimeout = 5 * time.Second
Expand Down

0 comments on commit 9366d36

Please sign in to comment.