Skip to content

Commit

Permalink
Metrics exporter cache requests that fail with an unrecoverable error (
Browse files Browse the repository at this point in the history
…#768)

* cache CreateMetricDescriptor requests that fail with an non-recoverable error
---------

Co-authored-by: David Ashpole <dashpole@google.com>
  • Loading branch information
avilevy18 and dashpole committed Nov 13, 2023
1 parent 337c81d commit a8f42c7
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 4 deletions.
26 changes: 22 additions & 4 deletions exporter/collector/metrics.go
Expand Up @@ -79,7 +79,7 @@ type MetricsExporter struct {
mdCache map[string]*monitoringpb.CreateMetricDescriptorRequest
// A channel that receives metric descriptor and sends them to GCM once
metricDescriptorC chan *monitoringpb.CreateMetricDescriptorRequest
client *monitoring.MetricClient
client monitoringClient
// Only used for testing purposes in lieu of initializing a fake client
exportFunc func(context.Context, *monitoringpb.CreateTimeSeriesRequest) error
// requestOpts applies options to the context for requests, such as additional headers.
Expand Down Expand Up @@ -137,6 +137,15 @@ const (

type labels map[string]string

// monitoringClient is the subset of monitoring.MetricClient this exporter uses,
// and allows us to mock the implementation for testing.
type monitoringClient interface {
CreateTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, opts ...gax.CallOption) error
CreateServiceTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, opts ...gax.CallOption) error
Close() error
CreateMetricDescriptor(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error)
}

func (me *MetricsExporter) Shutdown(ctx context.Context) error {
// TODO: pass ctx to goroutines so that we can use its deadline
close(me.shutdownC)
Expand Down Expand Up @@ -471,8 +480,7 @@ func (me *MetricsExporter) readWALAndExport(ctx context.Context) error {
me.obs.log.Warn(fmt.Sprintf("error exporting to GCM: %+v", err))
}
// retry at same read index if retryable (network) error
s := status.Convert(err)
if !(s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable) {
if isNotRecoverable(err) {
break
}
me.obs.log.Error("retryable error, retrying request")
Expand Down Expand Up @@ -655,6 +663,12 @@ func projectName(projectID string) string {
return fmt.Sprintf("projects/%s", projectID)
}

// isNotRecoverable returns true if the error is permanent.
func isNotRecoverable(err error) bool {
s := status.Convert(err)
return !(s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable)
}

// Helper method to send metric descriptors to GCM.
func (me *MetricsExporter) exportMetricDescriptor(req *monitoringpb.CreateMetricDescriptorRequest) {
cacheKey := fmt.Sprintf("%s/%s", req.Name, req.MetricDescriptor.Type)
Expand All @@ -669,12 +683,16 @@ func (me *MetricsExporter) exportMetricDescriptor(req *monitoringpb.CreateMetric
}
_, err := me.client.CreateMetricDescriptor(ctx, req)
if err != nil {
if isNotRecoverable(err) {
// cache if the error is non-recoverable
me.mdCache[cacheKey] = req
}
// TODO: Log-once on error, per metric descriptor?
me.obs.log.Error("Unable to send metric descriptor.", zap.Error(err), zap.Any("metric_descriptor", req.MetricDescriptor))
return
}

// only cache if we are successful. We want to retry if there is an error
// cache if we are successful
me.mdCache[cacheKey] = req
}

Expand Down
116 changes: 116 additions & 0 deletions exporter/collector/metrics_test.go
Expand Up @@ -24,12 +24,14 @@ import (

"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/wal"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/genproto/googleapis/api/label"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
Expand Down Expand Up @@ -58,6 +60,120 @@ func newTestMetricMapper() (metricMapper, func()) {
}, func() { close(s) }
}

type mock struct {
monitoringClient
createMetricDescriptor func(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error)
}

func (m *mock) CreateMetricDescriptor(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error) {
return m.createMetricDescriptor(ctx, req)
}

func TestExportCreateMetricDescriptorCache(t *testing.T) {
for _, tc := range []struct {
reqs []*monitoringpb.CreateMetricDescriptorRequest
desc string
createMetricDescriptorResponses []error
expectedTimesRequestCalled int
expectedTimesZapCalled int
}{
{
expectedTimesRequestCalled: 2,
expectedTimesZapCalled: 0,
desc: "valid metric descriptor gets created",
createMetricDescriptorResponses: []error{nil, nil},
reqs: []*monitoringpb.CreateMetricDescriptorRequest{
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
{
Name: "bar",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "baz",
},
},
},
},
{
expectedTimesRequestCalled: 1,
expectedTimesZapCalled: 1,
desc: "non-recoverable error",
createMetricDescriptorResponses: []error{
status.Error(codes.PermissionDenied, "permission denied"),
status.Error(codes.PermissionDenied, "permission denied"),
},
reqs: []*monitoringpb.CreateMetricDescriptorRequest{
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
},
},
{
expectedTimesRequestCalled: 2,
expectedTimesZapCalled: 1,
desc: "recoverable error",
createMetricDescriptorResponses: []error{
status.Error(codes.DeadlineExceeded, "deadline exceeded"),
nil,
},
reqs: []*monitoringpb.CreateMetricDescriptorRequest{
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
{
Name: "foo",
MetricDescriptor: &metricpb.MetricDescriptor{
Type: "goo",
},
},
},
},
} {
logger, observed := observer.New(zap.DebugLevel)

actualTimesCalled := 0
i := 0
m := &mock{
createMetricDescriptor: func(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error) {
actualTimesCalled++
err := tc.createMetricDescriptorResponses[i]
i++
return req.MetricDescriptor, err
},
}

me := MetricsExporter{
mdCache: make(map[string]*monitoringpb.CreateMetricDescriptorRequest),
obs: selfObservability{
log: zap.New(logger),
},
client: m,
}

for _, r := range tc.reqs {
me.exportMetricDescriptor(r)
}

require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), tc.expectedTimesZapCalled)
require.Equal(t, tc.expectedTimesRequestCalled, actualTimesCalled)
}
}

func TestMetricToTimeSeries(t *testing.T) {
mr := &monitoredrespb.MonitoredResource{}

Expand Down

0 comments on commit a8f42c7

Please sign in to comment.