diff --git a/CHANGELOG.md b/CHANGELOG.md index e6d27b6b02d..0b81d70ccdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340) - Reenabled Attribute Filters in the Metric SDK. (#3396) - Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432) +- Handle partial success responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric` exporters. (#3162, #3440) ## [1.11.1/0.33.0] 2022-10-19 diff --git a/exporters/otlp/otlpmetric/internal/otest/client.go b/exporters/otlp/otlpmetric/internal/otest/client.go index 35939664999..39002156a5c 100644 --- a/exporters/otlp/otlpmetric/internal/otest/client.go +++ b/exporters/otlp/otlpmetric/internal/otest/client.go @@ -16,6 +16,7 @@ package otest // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/inte import ( "context" + "fmt" "testing" "time" @@ -24,9 +25,11 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/metric/unit" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" + collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" cpb "go.opentelemetry.io/proto/otlp/common/v1" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" rpb "go.opentelemetry.io/proto/otlp/resource/v1" @@ -168,7 +171,10 @@ var ( // otlpmetric.Client implementation that is connected to also returned // Collector implementation. The Client is ready to upload metric data to the // Collector which is ready to store that data. -type ClientFactory func() (otlpmetric.Client, Collector) +// +// If resultCh is not nil, the returned Collector needs to use the responses +// from that channel to send back to the client for every export request. +type ClientFactory func(resultCh <-chan ExportResult) (otlpmetric.Client, Collector) // RunClientTests runs a suite of Client integration tests. For example: // @@ -177,17 +183,17 @@ func RunClientTests(f ClientFactory) func(*testing.T) { return func(t *testing.T) { t.Run("ClientHonorsContextErrors", func(t *testing.T) { t.Run("Shutdown", testCtxErrs(func() func(context.Context) error { - c, _ := f() + c, _ := f(nil) return c.Shutdown })) t.Run("ForceFlush", testCtxErrs(func() func(context.Context) error { - c, _ := f() + c, _ := f(nil) return c.ForceFlush })) t.Run("UploadMetrics", testCtxErrs(func() func(context.Context) error { - c, _ := f() + c, _ := f(nil) return func(ctx context.Context) error { return c.UploadMetrics(ctx, nil) } @@ -196,7 +202,7 @@ func RunClientTests(f ClientFactory) func(*testing.T) { t.Run("ForceFlushFlushes", func(t *testing.T) { ctx := context.Background() - client, collector := f() + client, collector := f(nil) require.NoError(t, client.UploadMetrics(ctx, resourceMetrics)) require.NoError(t, client.ForceFlush(ctx)) @@ -211,7 +217,7 @@ func RunClientTests(f ClientFactory) func(*testing.T) { t.Run("UploadMetrics", func(t *testing.T) { ctx := context.Background() - client, coll := f() + client, coll := f(nil) require.NoError(t, client.UploadMetrics(ctx, resourceMetrics)) require.NoError(t, client.Shutdown(ctx)) @@ -222,6 +228,51 @@ func RunClientTests(f ClientFactory) func(*testing.T) { t.Fatalf("unexpected ResourceMetrics:\n%s", diff) } }) + + t.Run("PartialSuccess", func(t *testing.T) { + const n, msg = 2, "bad data" + rCh := make(chan ExportResult, 3) + rCh <- ExportResult{ + Response: &collpb.ExportMetricsServiceResponse{ + PartialSuccess: &collpb.ExportMetricsPartialSuccess{ + RejectedDataPoints: n, + ErrorMessage: msg, + }, + }, + } + rCh <- ExportResult{ + Response: &collpb.ExportMetricsServiceResponse{ + PartialSuccess: &collpb.ExportMetricsPartialSuccess{ + // Should not be logged. + RejectedDataPoints: 0, + ErrorMessage: "", + }, + }, + } + rCh <- ExportResult{ + Response: &collpb.ExportMetricsServiceResponse{}, + } + + ctx := context.Background() + client, _ := f(rCh) + + defer func(orig otel.ErrorHandler) { + otel.SetErrorHandler(orig) + }(otel.GetErrorHandler()) + + errs := []error{} + eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) }) + otel.SetErrorHandler(eh) + + require.NoError(t, client.UploadMetrics(ctx, resourceMetrics)) + require.NoError(t, client.UploadMetrics(ctx, resourceMetrics)) + require.NoError(t, client.UploadMetrics(ctx, resourceMetrics)) + require.NoError(t, client.Shutdown(ctx)) + + require.Equal(t, 1, len(errs)) + want := fmt.Sprintf("%s (%d metric data points rejected)", msg, n) + assert.ErrorContains(t, errs[0], want) + }) } } diff --git a/exporters/otlp/otlpmetric/internal/otest/client_test.go b/exporters/otlp/otlpmetric/internal/otest/client_test.go index e701d10b8db..427b68c4e8c 100644 --- a/exporters/otlp/otlpmetric/internal/otest/client_test.go +++ b/exporters/otlp/otlpmetric/internal/otest/client_test.go @@ -18,6 +18,8 @@ import ( "context" "testing" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/internal" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -28,6 +30,7 @@ import ( ) type client struct { + rCh <-chan ExportResult storage *Storage } @@ -47,6 +50,17 @@ func (c *client) UploadMetrics(ctx context.Context, rm *mpb.ResourceMetrics) err c.storage.Add(&cpb.ExportMetricsServiceRequest{ ResourceMetrics: []*mpb.ResourceMetrics{rm}, }) + if c.rCh != nil { + r := <-c.rCh + if r.Response != nil && r.Response.GetPartialSuccess() != nil { + msg := r.Response.GetPartialSuccess().GetErrorMessage() + n := r.Response.GetPartialSuccess().GetRejectedDataPoints() + if msg != "" || n > 0 { + otel.Handle(internal.MetricPartialSuccessError(n, msg)) + } + } + return r.Err + } return ctx.Err() } @@ -54,8 +68,8 @@ func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() } func (c *client) Shutdown(ctx context.Context) error { return ctx.Err() } func TestClientTests(t *testing.T) { - factory := func() (otlpmetric.Client, Collector) { - c := &client{storage: NewStorage()} + factory := func(rCh <-chan ExportResult) (otlpmetric.Client, Collector) { + c := &client{rCh: rCh, storage: NewStorage()} return c, c } diff --git a/exporters/otlp/otlpmetric/internal/otest/collector.go b/exporters/otlp/otlpmetric/internal/otest/collector.go index f49355f9f39..50ebd8e013a 100644 --- a/exporters/otlp/otlpmetric/internal/otest/collector.go +++ b/exporters/otlp/otlpmetric/internal/otest/collector.go @@ -50,6 +50,11 @@ type Collector interface { Collect() *Storage } +type ExportResult struct { + Response *collpb.ExportMetricsServiceResponse + Err error +} + // Storage stores uploaded OTLP metric data in their proto form. type Storage struct { dataMu sync.Mutex @@ -86,7 +91,7 @@ type GRPCCollector struct { headers metadata.MD storage *Storage - errCh <-chan error + resultCh <-chan ExportResult listener net.Listener srv *grpc.Server } @@ -100,14 +105,14 @@ type GRPCCollector struct { // If errCh is not nil, the collector will respond to Export calls with errors // sent on that channel. This means that if errCh is not nil Export calls will // block until an error is received. -func NewGRPCCollector(endpoint string, errCh <-chan error) (*GRPCCollector, error) { +func NewGRPCCollector(endpoint string, resultCh <-chan ExportResult) (*GRPCCollector, error) { if endpoint == "" { endpoint = "localhost:0" } c := &GRPCCollector{ - storage: NewStorage(), - errCh: errCh, + storage: NewStorage(), + resultCh: resultCh, } var err error @@ -155,11 +160,14 @@ func (c *GRPCCollector) Export(ctx context.Context, req *collpb.ExportMetricsSer c.headersMu.Unlock() } - var err error - if c.errCh != nil { - err = <-c.errCh + if c.resultCh != nil { + r := <-c.resultCh + if r.Response == nil { + return &collpb.ExportMetricsServiceResponse{}, r.Err + } + return r.Response, r.Err } - return &collpb.ExportMetricsServiceResponse{}, err + return &collpb.ExportMetricsServiceResponse{}, nil } var emptyExportMetricsServiceResponse = func() []byte { @@ -189,7 +197,7 @@ type HTTPCollector struct { headers http.Header storage *Storage - errCh <-chan error + resultCh <-chan ExportResult listener net.Listener srv *http.Server } @@ -207,7 +215,7 @@ type HTTPCollector struct { // If errCh is not nil, the collector will respond to HTTP requests with errors // sent on that channel. This means that if errCh is not nil Export calls will // block until an error is received. -func NewHTTPCollector(endpoint string, errCh <-chan error) (*HTTPCollector, error) { +func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPCollector, error) { u, err := url.Parse(endpoint) if err != nil { return nil, err @@ -220,9 +228,9 @@ func NewHTTPCollector(endpoint string, errCh <-chan error) (*HTTPCollector, erro } c := &HTTPCollector{ - headers: http.Header{}, - storage: NewStorage(), - errCh: errCh, + headers: http.Header{}, + storage: NewStorage(), + resultCh: resultCh, } c.listener, err = net.Listen("tcp", u.Host) @@ -276,22 +284,25 @@ func (c *HTTPCollector) handler(w http.ResponseWriter, r *http.Request) { c.respond(w, c.record(r)) } -func (c *HTTPCollector) record(r *http.Request) error { +func (c *HTTPCollector) record(r *http.Request) ExportResult { // Currently only supports protobuf. if v := r.Header.Get("Content-Type"); v != "application/x-protobuf" { - return fmt.Errorf("content-type not supported: %s", v) + err := fmt.Errorf("content-type not supported: %s", v) + return ExportResult{Err: err} } body, err := c.readBody(r) if err != nil { - return err + return ExportResult{Err: err} } pbRequest := &collpb.ExportMetricsServiceRequest{} err = proto.Unmarshal(body, pbRequest) if err != nil { - return &HTTPResponseError{ - Err: err, - Status: http.StatusInternalServerError, + return ExportResult{ + Err: &HTTPResponseError{ + Err: err, + Status: http.StatusInternalServerError, + }, } } c.storage.Add(pbRequest) @@ -304,10 +315,10 @@ func (c *HTTPCollector) record(r *http.Request) error { } c.headersMu.Unlock() - if c.errCh != nil { - err = <-c.errCh + if c.resultCh != nil { + return <-c.resultCh } - return err + return ExportResult{Err: err} } func (c *HTTPCollector) readBody(r *http.Request) (body []byte, err error) { @@ -345,12 +356,12 @@ func (c *HTTPCollector) readBody(r *http.Request) (body []byte, err error) { return body, err } -func (c *HTTPCollector) respond(w http.ResponseWriter, err error) { - if err != nil { +func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) { + if resp.Err != nil { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("X-Content-Type-Options", "nosniff") var e *HTTPResponseError - if errors.As(err, &e) { + if errors.As(resp.Err, &e) { for k, vals := range e.Header { for _, v := range vals { w.Header().Add(k, v) @@ -360,14 +371,22 @@ func (c *HTTPCollector) respond(w http.ResponseWriter, err error) { fmt.Fprintln(w, e.Error()) } else { w.WriteHeader(http.StatusBadRequest) - fmt.Fprintln(w, err.Error()) + fmt.Fprintln(w, resp.Err.Error()) } return } w.Header().Set("Content-Type", "application/x-protobuf") w.WriteHeader(http.StatusOK) - _, _ = w.Write(emptyExportMetricsServiceResponse) + if resp.Response == nil { + _, _ = w.Write(emptyExportMetricsServiceResponse) + } else { + r, err := proto.Marshal(resp.Response) + if err != nil { + panic(err) + } + _, _ = w.Write(r) + } } type mathRandReader struct{} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index 0f1a0040f61..c001836c611 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -24,6 +24,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/internal" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" @@ -163,9 +165,17 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou defer cancel() return c.requestFunc(ctx, func(iCtx context.Context) error { - _, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{ + resp, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{ ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, }) + if resp != nil && resp.PartialSuccess != nil { + msg := resp.PartialSuccess.GetErrorMessage() + n := resp.PartialSuccess.GetRejectedDataPoints() + if n != 0 || msg != "" { + err := internal.MetricPartialSuccessError(n, msg) + otel.Handle(err) + } + } // nil is converted to OK. if status.Code(err) == codes.OK { // Success. diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go index 64d3b216a2d..ec6c27dfc87 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go @@ -130,8 +130,8 @@ func TestRetryable(t *testing.T) { } func TestClient(t *testing.T) { - factory := func() (otlpmetric.Client, otest.Collector) { - coll, err := otest.NewGRPCCollector("", nil) + factory := func(rCh <-chan otest.ExportResult) (otlpmetric.Client, otest.Collector) { + coll, err := otest.NewGRPCCollector("", rCh) require.NoError(t, err) ctx := context.Background() @@ -145,8 +145,8 @@ func TestClient(t *testing.T) { } func TestConfig(t *testing.T) { - factoryFunc := func(errCh <-chan error, o ...Option) (metric.Exporter, *otest.GRPCCollector) { - coll, err := otest.NewGRPCCollector("", errCh) + factoryFunc := func(rCh <-chan otest.ExportResult, o ...Option) (metric.Exporter, *otest.GRPCCollector) { + coll, err := otest.NewGRPCCollector("", rCh) require.NoError(t, err) ctx := context.Background() @@ -176,11 +176,11 @@ func TestConfig(t *testing.T) { }) t.Run("WithTimeout", func(t *testing.T) { - // Do not send on errCh so the Collector never responds to the client. - errCh := make(chan error) - t.Cleanup(func() { close(errCh) }) + // Do not send on rCh so the Collector never responds to the client. + rCh := make(chan otest.ExportResult) + t.Cleanup(func() { close(rCh) }) exp, coll := factoryFunc( - errCh, + rCh, WithTimeout(time.Millisecond), WithRetry(RetryConfig{Enabled: false}), ) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 9d1bac13f49..5232ac236a4 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -29,6 +29,7 @@ import ( "google.golang.org/protobuf/proto" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/internal" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" @@ -190,6 +191,29 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou switch resp.StatusCode { case http.StatusOK: // Success, do not retry. + + // Read the partial success message, if any. + var respData bytes.Buffer + if _, err := io.Copy(&respData, resp.Body); err != nil { + return err + } + + if respData.Len() != 0 { + var respProto colmetricpb.ExportMetricsServiceResponse + if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { + return err + } + + if respProto.PartialSuccess != nil { + msg := respProto.PartialSuccess.GetErrorMessage() + n := respProto.PartialSuccess.GetRejectedDataPoints() + if n != 0 || msg != "" { + err := internal.MetricPartialSuccessError(n, msg) + otel.Handle(err) + } + } + } + return nil case http.StatusTooManyRequests, http.StatusServiceUnavailable: // Retry-able failure. diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 09a6c15d82c..82e5f8ed29f 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -34,8 +34,8 @@ import ( ) func TestClient(t *testing.T) { - factory := func() (otlpmetric.Client, otest.Collector) { - coll, err := otest.NewHTTPCollector("", nil) + factory := func(rCh <-chan otest.ExportResult) (otlpmetric.Client, otest.Collector) { + coll, err := otest.NewHTTPCollector("", rCh) require.NoError(t, err) addr := coll.Addr().String() @@ -48,8 +48,8 @@ func TestClient(t *testing.T) { } func TestConfig(t *testing.T) { - factoryFunc := func(ePt string, errCh <-chan error, o ...Option) (metric.Exporter, *otest.HTTPCollector) { - coll, err := otest.NewHTTPCollector(ePt, errCh) + factoryFunc := func(ePt string, rCh <-chan otest.ExportResult, o ...Option) (metric.Exporter, *otest.HTTPCollector) { + coll, err := otest.NewHTTPCollector(ePt, rCh) require.NoError(t, err) opts := []Option{WithEndpoint(coll.Addr().String())} @@ -81,18 +81,18 @@ func TestConfig(t *testing.T) { }) t.Run("WithTimeout", func(t *testing.T) { - // Do not send on errCh so the Collector never responds to the client. - errCh := make(chan error) + // Do not send on rCh so the Collector never responds to the client. + rCh := make(chan otest.ExportResult) exp, coll := factoryFunc( "", - errCh, + rCh, WithTimeout(time.Millisecond), WithRetry(RetryConfig{Enabled: false}), ) ctx := context.Background() t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) // Push this after Shutdown so the HTTP server doesn't hang. - t.Cleanup(func() { close(errCh) }) + t.Cleanup(func() { close(rCh) }) t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) err := exp.Export(ctx, metricdata.ResourceMetrics{}) assert.ErrorContains(t, err, context.DeadlineExceeded.Error()) @@ -109,13 +109,20 @@ func TestConfig(t *testing.T) { t.Run("WithRetry", func(t *testing.T) { emptyErr := errors.New("") - errCh := make(chan error, 3) + rCh := make(chan otest.ExportResult, 3) header := http.Header{http.CanonicalHeaderKey("Retry-After"): {"10"}} // Both retryable errors. - errCh <- &otest.HTTPResponseError{Status: http.StatusServiceUnavailable, Err: emptyErr, Header: header} - errCh <- &otest.HTTPResponseError{Status: http.StatusTooManyRequests, Err: emptyErr} - errCh <- nil - exp, coll := factoryFunc("", errCh, WithRetry(RetryConfig{ + rCh <- otest.ExportResult{Err: &otest.HTTPResponseError{ + Status: http.StatusServiceUnavailable, + Err: emptyErr, + Header: header, + }} + rCh <- otest.ExportResult{Err: &otest.HTTPResponseError{ + Status: http.StatusTooManyRequests, + Err: emptyErr, + }} + rCh <- otest.ExportResult{} + exp, coll := factoryFunc("", rCh, WithRetry(RetryConfig{ Enabled: true, InitialInterval: time.Nanosecond, MaxInterval: time.Millisecond, @@ -124,10 +131,10 @@ func TestConfig(t *testing.T) { ctx := context.Background() t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) // Push this after Shutdown so the HTTP server doesn't hang. - t.Cleanup(func() { close(errCh) }) + t.Cleanup(func() { close(rCh) }) t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) assert.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}), "failed retry") - assert.Len(t, errCh, 0, "failed HTTP responses did not occur") + assert.Len(t, rCh, 0, "failed HTTP responses did not occur") }) t.Run("WithURLPath", func(t *testing.T) {