Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Commit

Permalink
transform resources for metrics exported via ExportMetricsProto api (#…
Browse files Browse the repository at this point in the history
…158)

* transform resources for metrics exported via ExportMetricsProto interface.

* added test case for per metric resource.
  • Loading branch information
rghetia committed May 31, 2019
1 parent 69e294b commit 5d4eb99
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 76 deletions.
3 changes: 2 additions & 1 deletion equivalence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestStatsAndMetricsEquivalence(t *testing.T) {
// Now perform some exporting.
for i, vd := range vdl {
se := &statsExporter{
o: Options{ProjectID: "equivalence"},
o: Options{ProjectID: "equivalence", MapResource: defaultMapResource},
}

ctx := context.Background()
Expand Down Expand Up @@ -143,6 +143,7 @@ func TestEquivalenceStatsVsMetricsUploads(t *testing.T) {
// so that batching is performed deterministically and flushing is
// fully controlled by us.
BundleDelayThreshold: 2 * time.Hour,
MapResource: defaultMapResource,
}
se, err := newStatsExporter(exporterOptions)
if err != nil {
Expand Down
45 changes: 21 additions & 24 deletions metrics_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
labelpb "google.golang.org/genproto/googleapis/api/label"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opencensus.io/resource"
)

var errNilMetric = errors.New("expecting a non-nil metric")
Expand Down Expand Up @@ -328,6 +328,23 @@ func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monito
return ctsreql
}

func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource {
if rsc == nil {
return &resource.Resource{
Type: "global",
}
}
res := &resource.Resource{
Type: rsc.Type,
Labels: make(map[string]string, len(rsc.Labels)),
}

for k, v := range rsc.Labels {
res.Labels[k] = v
}
return res
}

// protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest
// but it doesn't invoke any remote API.
func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) {
Expand All @@ -340,6 +357,8 @@ func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *comm
resource = metric.Resource
}

mappedRes := se.o.MapResource(resourcepbToResource(resource))

metricName, _, _, err := metricProseFromProto(metric)
if err != nil {
return nil, err
Expand Down Expand Up @@ -367,7 +386,7 @@ func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *comm
Type: metricType,
Labels: labels,
},
Resource: protoResourceToMonitoredResource(resource),
Resource: mappedRes,
Points: sdPoints,
})
}
Expand Down Expand Up @@ -686,28 +705,6 @@ func protoMetricDescriptorTypeToMetricKind(m *metricspb.Metric) (googlemetricpb.
}
}

func protoResourceToMonitoredResource(rsp *resourcepb.Resource) *monitoredrespb.MonitoredResource {
if rsp == nil {
return &monitoredrespb.MonitoredResource{
Type: "global",
}
}
typ := rsp.Type
if typ == "" {
typ = "global"
}
mrsp := &monitoredrespb.MonitoredResource{
Type: typ,
}
if rsp.Labels != nil {
mrsp.Labels = make(map[string]string, len(rsp.Labels))
for k, v := range rsp.Labels {
mrsp.Labels[k] = v
}
}
return mrsp
}

func getDefaultLabelsFromNode(node *commonpb.Node) map[string]labelValue {
taskValue := fmt.Sprintf("%s-%d@%s", strings.ToLower(node.LibraryInfo.GetLanguage().String()), node.Identifier.Pid, node.Identifier.HostName)
return map[string]labelValue{
Expand Down
231 changes: 184 additions & 47 deletions metrics_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"cloud.google.com/go/monitoring/apiv3"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/golang/protobuf/ptypes/timestamp"
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
labelpb "google.golang.org/genproto/googleapis/api/label"
Expand All @@ -30,56 +31,11 @@ import (

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"go.opencensus.io/resource/resourcekeys"
)

func TestProtoResourceToMonitoringResource(t *testing.T) {
tests := []struct {
in *resourcepb.Resource
want *monitoredrespb.MonitoredResource
}{
{in: nil, want: &monitoredrespb.MonitoredResource{Type: "global"}},
{in: &resourcepb.Resource{}, want: &monitoredrespb.MonitoredResource{Type: "global"}},
{
in: &resourcepb.Resource{
Type: "foo",
},
want: &monitoredrespb.MonitoredResource{
Type: "foo",
},
},
{
in: &resourcepb.Resource{
Type: "foo",
Labels: map[string]string{},
},
want: &monitoredrespb.MonitoredResource{
Type: "foo",
Labels: map[string]string{},
},
},
{
in: &resourcepb.Resource{
Type: "foo",
Labels: map[string]string{"a": "A"},
},
want: &monitoredrespb.MonitoredResource{
Type: "foo",
Labels: map[string]string{"a": "A"},
},
},
}

for i, tt := range tests {
got := protoResourceToMonitoredResource(tt.in)
if diff := cmpResource(got, tt.want); diff != "" {
t.Fatalf("Test %d failed. Unexpected Resource -got +want: %s", i, diff)
}
}
}

func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) {
startTimestamp := &timestamp.Timestamp{
Seconds: 1543160298,
Expand Down Expand Up @@ -133,7 +89,7 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) {
},
},
statsExporter: &statsExporter{
o: Options{ProjectID: "foo"},
o: Options{ProjectID: "foo", MapResource: defaultMapResource},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Expand Down Expand Up @@ -205,6 +161,187 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) {
}
}

func TestProtoMetricWithDifferentResource(t *testing.T) {
startTimestamp := &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000090,
}
endTimestamp := &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000997,
}

tests := []struct {
in *metricspb.Metric
want []*monitoringpb.CreateTimeSeriesRequest
wantErr string
statsExporter *statsExporter
}{
{
in: &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "with_k8s_resource",
Description: "This is a test",
Unit: "By",
},
Resource: &resourcepb.Resource{
Type: resourcekeys.K8SType,
Labels: map[string]string{
resourcekeys.K8SKeyClusterName: "cluster1",
resourcekeys.K8SKeyPodName: "pod1",
resourcekeys.K8SKeyNamespaceName: "namespace1",
resourcekeys.ContainerKeyName: "container-name1",
resourcekeys.CloudKeyZone: "zone1",
},
},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: startTimestamp,
Points: []*metricspb.Point{
{
Timestamp: endTimestamp,
Value: &metricspb.Point_Int64Value{
Int64Value: 1,
},
},
},
},
},
},
statsExporter: &statsExporter{
o: Options{ProjectID: "foo", MapResource: defaultMapResource},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/with_k8s_resource",
Labels: map[string]string{},
},
Resource: &monitoredrespb.MonitoredResource{
Type: "k8s_container",
Labels: map[string]string{
"location": "zone1",
"cluster_name": "cluster1",
"namespace_name": "namespace1",
"pod_name": "pod1",
"container_name": "container-name1",
},
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: 1,
},
},
},
},
},
},
},
},
},
{
in: &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "with_gce_resource",
Description: "This is a test",
Unit: "By",
},
Resource: &resourcepb.Resource{
Type: resourcekeys.CloudType,
Labels: map[string]string{
resourcekeys.CloudKeyProvider: resourcekeys.CloudProviderGCP,
resourcekeys.HostKeyID: "inst1",
resourcekeys.CloudKeyZone: "zone1",
},
},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: startTimestamp,
Points: []*metricspb.Point{
{
Timestamp: endTimestamp,
Value: &metricspb.Point_Int64Value{
Int64Value: 1,
},
},
},
},
},
},
statsExporter: &statsExporter{
o: Options{ProjectID: "foo", MapResource: defaultMapResource},
},
want: []*monitoringpb.CreateTimeSeriesRequest{
{
Name: "projects/foo",
TimeSeries: []*monitoringpb.TimeSeries{
{
Metric: &googlemetricpb.Metric{
Type: "custom.googleapis.com/opencensus/with_gce_resource",
Labels: map[string]string{},
},
Resource: &monitoredrespb.MonitoredResource{
Type: "gce_instance",
Labels: map[string]string{
"instance_id": "inst1",
"zone": "zone1",
},
},
Points: []*monitoringpb.Point{
{
Interval: &monitoringpb.TimeInterval{
StartTime: startTimestamp,
EndTime: endTimestamp,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: 1,
},
},
},
},
},
},
},
},
},
}

for i, tt := range tests {
se := tt.statsExporter
if se == nil {
se = new(statsExporter)
}
tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, nil, tt.in, nil)
if tt.wantErr != "" {
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr)
}
continue
}
if err != nil {
t.Errorf("#%d: unexpected error: %v", i, err)
continue
}

got := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
// Our saving grace is serialization equality since some
// unexported fields could be present in the various values.
if diff := cmpTSReqs(got, tt.want); diff != "" {
t.Fatalf("Test %d failed. Unexpected CreateTimeSeriesRequests -got +want: %s", i, diff)
}
}
}

func TestProtoToMonitoringMetricDescriptor(t *testing.T) {
tests := []struct {
in *metricspb.Metric
Expand Down

0 comments on commit 5d4eb99

Please sign in to comment.