Skip to content

Commit

Permalink
Handle partial success response from OTLP server in otlpmetric export…
Browse files Browse the repository at this point in the history
…ers (#3440)

* Handle partial success resp from OTLP server

* Test partial success response handling

* Add changes to changelog

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
  • Loading branch information
MrAlias and hanyuancheung committed Nov 9, 2022
1 parent 2694dbf commit 484c8bd
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340)
- Reenabled Attribute Filters in the Metric SDK. (#3396)
- Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432)
- Handle partial success responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric` exporters. (#3162, #3440)

## [1.11.1/0.33.0] 2022-10-19

Expand Down
63 changes: 57 additions & 6 deletions exporters/otlp/otlpmetric/internal/otest/client.go
Expand Up @@ -16,6 +16,7 @@ package otest // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/inte

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -24,9 +25,11 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/metric/unit"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
rpb "go.opentelemetry.io/proto/otlp/resource/v1"
Expand Down Expand Up @@ -168,7 +171,10 @@ var (
// otlpmetric.Client implementation that is connected to also returned
// Collector implementation. The Client is ready to upload metric data to the
// Collector which is ready to store that data.
type ClientFactory func() (otlpmetric.Client, Collector)
//
// If resultCh is not nil, the returned Collector needs to use the responses
// from that channel to send back to the client for every export request.
type ClientFactory func(resultCh <-chan ExportResult) (otlpmetric.Client, Collector)

// RunClientTests runs a suite of Client integration tests. For example:
//
Expand All @@ -177,17 +183,17 @@ func RunClientTests(f ClientFactory) func(*testing.T) {
return func(t *testing.T) {
t.Run("ClientHonorsContextErrors", func(t *testing.T) {
t.Run("Shutdown", testCtxErrs(func() func(context.Context) error {
c, _ := f()
c, _ := f(nil)
return c.Shutdown
}))

t.Run("ForceFlush", testCtxErrs(func() func(context.Context) error {
c, _ := f()
c, _ := f(nil)
return c.ForceFlush
}))

t.Run("UploadMetrics", testCtxErrs(func() func(context.Context) error {
c, _ := f()
c, _ := f(nil)
return func(ctx context.Context) error {
return c.UploadMetrics(ctx, nil)
}
Expand All @@ -196,7 +202,7 @@ func RunClientTests(f ClientFactory) func(*testing.T) {

t.Run("ForceFlushFlushes", func(t *testing.T) {
ctx := context.Background()
client, collector := f()
client, collector := f(nil)
require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))

require.NoError(t, client.ForceFlush(ctx))
Expand All @@ -211,7 +217,7 @@ func RunClientTests(f ClientFactory) func(*testing.T) {

t.Run("UploadMetrics", func(t *testing.T) {
ctx := context.Background()
client, coll := f()
client, coll := f(nil)

require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
require.NoError(t, client.Shutdown(ctx))
Expand All @@ -222,6 +228,51 @@ func RunClientTests(f ClientFactory) func(*testing.T) {
t.Fatalf("unexpected ResourceMetrics:\n%s", diff)
}
})

t.Run("PartialSuccess", func(t *testing.T) {
const n, msg = 2, "bad data"
rCh := make(chan ExportResult, 3)
rCh <- ExportResult{
Response: &collpb.ExportMetricsServiceResponse{
PartialSuccess: &collpb.ExportMetricsPartialSuccess{
RejectedDataPoints: n,
ErrorMessage: msg,
},
},
}
rCh <- ExportResult{
Response: &collpb.ExportMetricsServiceResponse{
PartialSuccess: &collpb.ExportMetricsPartialSuccess{
// Should not be logged.
RejectedDataPoints: 0,
ErrorMessage: "",
},
},
}
rCh <- ExportResult{
Response: &collpb.ExportMetricsServiceResponse{},
}

ctx := context.Background()
client, _ := f(rCh)

defer func(orig otel.ErrorHandler) {
otel.SetErrorHandler(orig)
}(otel.GetErrorHandler())

errs := []error{}
eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
otel.SetErrorHandler(eh)

require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
require.NoError(t, client.Shutdown(ctx))

require.Equal(t, 1, len(errs))
want := fmt.Sprintf("%s (%d metric data points rejected)", msg, n)
assert.ErrorContains(t, errs[0], want)
})
}
}

Expand Down
18 changes: 16 additions & 2 deletions exporters/otlp/otlpmetric/internal/otest/client_test.go
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"testing"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
Expand All @@ -28,6 +30,7 @@ import (
)

type client struct {
rCh <-chan ExportResult
storage *Storage
}

Expand All @@ -47,15 +50,26 @@ func (c *client) UploadMetrics(ctx context.Context, rm *mpb.ResourceMetrics) err
c.storage.Add(&cpb.ExportMetricsServiceRequest{
ResourceMetrics: []*mpb.ResourceMetrics{rm},
})
if c.rCh != nil {
r := <-c.rCh
if r.Response != nil && r.Response.GetPartialSuccess() != nil {
msg := r.Response.GetPartialSuccess().GetErrorMessage()
n := r.Response.GetPartialSuccess().GetRejectedDataPoints()
if msg != "" || n > 0 {
otel.Handle(internal.MetricPartialSuccessError(n, msg))
}
}
return r.Err
}
return ctx.Err()
}

func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }
func (c *client) Shutdown(ctx context.Context) error { return ctx.Err() }

func TestClientTests(t *testing.T) {
factory := func() (otlpmetric.Client, Collector) {
c := &client{storage: NewStorage()}
factory := func(rCh <-chan ExportResult) (otlpmetric.Client, Collector) {
c := &client{rCh: rCh, storage: NewStorage()}
return c, c
}

Expand Down
73 changes: 46 additions & 27 deletions exporters/otlp/otlpmetric/internal/otest/collector.go
Expand Up @@ -50,6 +50,11 @@ type Collector interface {
Collect() *Storage
}

type ExportResult struct {
Response *collpb.ExportMetricsServiceResponse
Err error
}

// Storage stores uploaded OTLP metric data in their proto form.
type Storage struct {
dataMu sync.Mutex
Expand Down Expand Up @@ -86,7 +91,7 @@ type GRPCCollector struct {
headers metadata.MD
storage *Storage

errCh <-chan error
resultCh <-chan ExportResult
listener net.Listener
srv *grpc.Server
}
Expand All @@ -100,14 +105,14 @@ type GRPCCollector struct {
// If errCh is not nil, the collector will respond to Export calls with errors
// sent on that channel. This means that if errCh is not nil Export calls will
// block until an error is received.
func NewGRPCCollector(endpoint string, errCh <-chan error) (*GRPCCollector, error) {
func NewGRPCCollector(endpoint string, resultCh <-chan ExportResult) (*GRPCCollector, error) {
if endpoint == "" {
endpoint = "localhost:0"
}

c := &GRPCCollector{
storage: NewStorage(),
errCh: errCh,
storage: NewStorage(),
resultCh: resultCh,
}

var err error
Expand Down Expand Up @@ -155,11 +160,14 @@ func (c *GRPCCollector) Export(ctx context.Context, req *collpb.ExportMetricsSer
c.headersMu.Unlock()
}

var err error
if c.errCh != nil {
err = <-c.errCh
if c.resultCh != nil {
r := <-c.resultCh
if r.Response == nil {
return &collpb.ExportMetricsServiceResponse{}, r.Err
}
return r.Response, r.Err
}
return &collpb.ExportMetricsServiceResponse{}, err
return &collpb.ExportMetricsServiceResponse{}, nil
}

var emptyExportMetricsServiceResponse = func() []byte {
Expand Down Expand Up @@ -189,7 +197,7 @@ type HTTPCollector struct {
headers http.Header
storage *Storage

errCh <-chan error
resultCh <-chan ExportResult
listener net.Listener
srv *http.Server
}
Expand All @@ -207,7 +215,7 @@ type HTTPCollector struct {
// If errCh is not nil, the collector will respond to HTTP requests with errors
// sent on that channel. This means that if errCh is not nil Export calls will
// block until an error is received.
func NewHTTPCollector(endpoint string, errCh <-chan error) (*HTTPCollector, error) {
func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPCollector, error) {
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
Expand All @@ -220,9 +228,9 @@ func NewHTTPCollector(endpoint string, errCh <-chan error) (*HTTPCollector, erro
}

c := &HTTPCollector{
headers: http.Header{},
storage: NewStorage(),
errCh: errCh,
headers: http.Header{},
storage: NewStorage(),
resultCh: resultCh,
}

c.listener, err = net.Listen("tcp", u.Host)
Expand Down Expand Up @@ -276,22 +284,25 @@ func (c *HTTPCollector) handler(w http.ResponseWriter, r *http.Request) {
c.respond(w, c.record(r))
}

func (c *HTTPCollector) record(r *http.Request) error {
func (c *HTTPCollector) record(r *http.Request) ExportResult {
// Currently only supports protobuf.
if v := r.Header.Get("Content-Type"); v != "application/x-protobuf" {
return fmt.Errorf("content-type not supported: %s", v)
err := fmt.Errorf("content-type not supported: %s", v)
return ExportResult{Err: err}
}

body, err := c.readBody(r)
if err != nil {
return err
return ExportResult{Err: err}
}
pbRequest := &collpb.ExportMetricsServiceRequest{}
err = proto.Unmarshal(body, pbRequest)
if err != nil {
return &HTTPResponseError{
Err: err,
Status: http.StatusInternalServerError,
return ExportResult{
Err: &HTTPResponseError{
Err: err,
Status: http.StatusInternalServerError,
},
}
}
c.storage.Add(pbRequest)
Expand All @@ -304,10 +315,10 @@ func (c *HTTPCollector) record(r *http.Request) error {
}
c.headersMu.Unlock()

if c.errCh != nil {
err = <-c.errCh
if c.resultCh != nil {
return <-c.resultCh
}
return err
return ExportResult{Err: err}
}

func (c *HTTPCollector) readBody(r *http.Request) (body []byte, err error) {
Expand Down Expand Up @@ -345,12 +356,12 @@ func (c *HTTPCollector) readBody(r *http.Request) (body []byte, err error) {
return body, err
}

func (c *HTTPCollector) respond(w http.ResponseWriter, err error) {
if err != nil {
func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) {
if resp.Err != nil {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
var e *HTTPResponseError
if errors.As(err, &e) {
if errors.As(resp.Err, &e) {
for k, vals := range e.Header {
for _, v := range vals {
w.Header().Add(k, v)
Expand All @@ -360,14 +371,22 @@ func (c *HTTPCollector) respond(w http.ResponseWriter, err error) {
fmt.Fprintln(w, e.Error())
} else {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintln(w, err.Error())
fmt.Fprintln(w, resp.Err.Error())
}
return
}

w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(emptyExportMetricsServiceResponse)
if resp.Response == nil {
_, _ = w.Write(emptyExportMetricsServiceResponse)
} else {
r, err := proto.Marshal(resp.Response)
if err != nil {
panic(err)
}
_, _ = w.Write(r)
}
}

type mathRandReader struct{}
Expand Down
12 changes: 11 additions & 1 deletion exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Expand Up @@ -24,6 +24,8 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
Expand Down Expand Up @@ -163,9 +165,17 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
defer cancel()

return c.requestFunc(ctx, func(iCtx context.Context) error {
_, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{
resp, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
})
if resp != nil && resp.PartialSuccess != nil {
msg := resp.PartialSuccess.GetErrorMessage()
n := resp.PartialSuccess.GetRejectedDataPoints()
if n != 0 || msg != "" {
err := internal.MetricPartialSuccessError(n, msg)
otel.Handle(err)
}
}
// nil is converted to OK.
if status.Code(err) == codes.OK {
// Success.
Expand Down

0 comments on commit 484c8bd

Please sign in to comment.