Skip to content

Commit

Permalink
Add back the otlpmetrichttp exporter (#3097)
Browse files Browse the repository at this point in the history
* Add otlpmetric package doc

* Add Client interface

* Add the Exporter

Have the Exporter ensure synchronous access to all client methods.

* Add race detection test for Exporter

* Expand New godocs

* Fix lint

* Add back the otlpmetrichttp pkg from main

* Restrict to Go 1.18 and above

* Remove integration testing

* Rename client_unit_test.go to client_test.go

* Rename options.go to config.go

* Remove the NewUnstarted func

* Remove Start method from client

* Add no-op ForceFlush method to client

* Update otlpconfig pkg name to oconf

* Rename Stop method to Shutdown

Match the otlpmetric.Client interface.

* Update creation functions to compile

* Remove name field from client

* Remove sync of methods from client

This is handled by the exporter.

* Remove unused generalCfg field from client

* Replace cfg client field with used conf vals

* Use a http request instead of url/header fields

* Remove NewClient and move New into client.go

* Rename client.client field to client.httpClient

* Update client tests

Remove test of a retry config and add functional tests of the client
methods honoring a context.

* Remove deprecated WithMaxAttempts and WithBackoff

* Update option docs

Include info on envvars.

* Fix lint

* Fix lint errors

* Revert New to accept a context

* Add example test

* Update pkg docs

* go mod tidy

* Use url.URL to form HTTP request URL
  • Loading branch information
MrAlias committed Sep 8, 2022
1 parent 40319d9 commit 9550f7b
Show file tree
Hide file tree
Showing 8 changed files with 1,100 additions and 0 deletions.
9 changes: 9 additions & 0 deletions .github/dependabot.yml
Expand Up @@ -145,6 +145,15 @@ updates:
schedule:
interval: weekly
day: sunday
- package-ecosystem: gomod
directory: /exporters/otlp/otlpmetric/otlpmetrichttp
labels:
- dependencies
- go
- Skip Changelog
schedule:
interval: weekly
day: sunday
- package-ecosystem: gomod
directory: /exporters/otlp/otlptrace
labels:
Expand Down
290 changes: 290 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client.go
@@ -0,0 +1,290 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package otlpmetrichttp // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"sync"
"time"

"google.golang.org/protobuf/proto"

"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

// New returns an OpenTelemetry metric Exporter. The Exporter can be used with
// a PeriodicReader to export OpenTelemetry metric data to an OTLP receiving
// endpoint using protobufs over HTTP.
func New(_ context.Context, opts ...Option) (metric.Exporter, error) {
c, err := newClient(opts...)
if err != nil {
return nil, err
}
return otlpmetric.New(c), nil
}

type client struct {
// req is cloned for every upload the client makes.
req *http.Request
compression Compression
requestFunc retry.RequestFunc
httpClient *http.Client
}

// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
// DefaultTransport is overwritten with some different implementation
// of http.RoundTripper or it's modified by another package.
var ourTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

// newClient creates a new HTTP metric client.
func newClient(opts ...Option) (otlpmetric.Client, error) {
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)

httpClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.Metrics.Timeout,
}
if cfg.Metrics.TLSCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.Metrics.TLSCfg
httpClient.Transport = transport
}

u := &url.URL{
Scheme: "https",
Host: cfg.Metrics.Endpoint,
Path: cfg.Metrics.URLPath,
}
if cfg.Metrics.Insecure {
u.Scheme = "http"
}
// Body is set when this is cloned during upload.
req, err := http.NewRequest(http.MethodPost, u.String(), http.NoBody)
if err != nil {
return nil, err
}

if n := len(cfg.Metrics.Headers); n > 0 {
for k, v := range cfg.Metrics.Headers {
req.Header.Set(k, v)
}
}
req.Header.Set("Content-Type", "application/x-protobuf")

return &client{
compression: Compression(cfg.Metrics.Compression),
req: req,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
httpClient: httpClient,
}, nil
}

// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }

// Shutdown shuts down the client, freeing all resources.
func (c *client) Shutdown(ctx context.Context) error {
// The otlpmetric.Exporter synchronizes access to client methods and
// ensures this is called only once. The only thing that needs to be done
// here is to release any computational resources the client holds.

c.requestFunc = nil
c.httpClient = nil
return ctx.Err()
}

// UploadMetrics sends protoMetrics to the connected endpoint.
//
// Retryable errors from the server will be handled according to any
// RetryConfig the client was created with.
func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics) error {
// The otlpmetric.Exporter synchronizes access to client methods, and
// ensures this is not called after the Exporter is shutdown. Only thing
// to do here is send data.

pbRequest := &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
}
body, err := proto.Marshal(pbRequest)
if err != nil {
return err
}
request, err := c.newRequest(ctx, body)
if err != nil {
return err
}

return c.requestFunc(ctx, func(iCtx context.Context) error {
select {
case <-iCtx.Done():
return iCtx.Err()
default:
}

request.reset(iCtx)
resp, err := c.httpClient.Do(request.Request)
if err != nil {
return err
}

var rErr error
switch resp.StatusCode {
case http.StatusOK:
// Success, do not retry.
case http.StatusTooManyRequests,
http.StatusServiceUnavailable:
// Retry-able failure.
rErr = newResponseError(resp.Header)

// Going to retry, drain the body to reuse the connection.
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
_ = resp.Body.Close()
return err
}
default:
rErr = fmt.Errorf("failed to send metrics to %s: %s", request.URL, resp.Status)
}

if err := resp.Body.Close(); err != nil {
return err
}
return rErr
})
}

var gzPool = sync.Pool{
New: func() interface{} {
w := gzip.NewWriter(io.Discard)
return w
},
}

func (c *client) newRequest(ctx context.Context, body []byte) (request, error) {
r := c.req.Clone(ctx)
req := request{Request: r}

switch c.compression {
case NoCompression:
r.ContentLength = (int64)(len(body))
req.bodyReader = bodyReader(body)
case GzipCompression:
// Ensure the content length is not used.
r.ContentLength = -1
r.Header.Set("Content-Encoding", "gzip")

gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)

var b bytes.Buffer
gz.Reset(&b)

if _, err := gz.Write(body); err != nil {
return req, err
}
// Close needs to be called to ensure body if fully written.
if err := gz.Close(); err != nil {
return req, err
}

req.bodyReader = bodyReader(b.Bytes())
}

return req, nil
}

// bodyReader returns a closure returning a new reader for buf.
func bodyReader(buf []byte) func() io.ReadCloser {
return func() io.ReadCloser {
return io.NopCloser(bytes.NewReader(buf))
}
}

// request wraps an http.Request with a resettable body reader.
type request struct {
*http.Request

// bodyReader allows the same body to be used for multiple requests.
bodyReader func() io.ReadCloser
}

// reset reinitializes the request Body and uses ctx for the request.
func (r *request) reset(ctx context.Context) {
r.Body = r.bodyReader()
r.Request = r.Request.WithContext(ctx)
}

// retryableError represents a request failure that can be retried.
type retryableError struct {
throttle int64
}

// newResponseError returns a retryableError and will extract any explicit
// throttle delay contained in headers.
func newResponseError(header http.Header) error {
var rErr retryableError
if s, ok := header["Retry-After"]; ok {
if t, err := strconv.ParseInt(s[0], 10, 64); err == nil {
rErr.throttle = t
}
}
return rErr
}

func (e retryableError) Error() string {
return "retry-able request failure"
}

// evaluate returns if err is retry-able. If it is and it includes an explicit
// throttling delay, that delay is also returned.
func evaluate(err error) (bool, time.Duration) {
if err == nil {
return false, 0
}

rErr, ok := err.(retryableError)
if !ok {
return false, 0
}

return true, time.Duration(rErr.throttle)
}
74 changes: 74 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package otlpmetrichttp

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClientHonorsContextErrors(t *testing.T) {
t.Run("Shutdown", testCtxErr(func(t *testing.T) func(context.Context) error {
c, err := newClient()
require.NoError(t, err)
return c.Shutdown
}))

t.Run("ForceFlush", testCtxErr(func(t *testing.T) func(context.Context) error {
c, err := newClient()
require.NoError(t, err)
return c.ForceFlush
}))

t.Run("UploadMetrics", testCtxErr(func(t *testing.T) func(context.Context) error {
c, err := newClient()
require.NoError(t, err)
return func(ctx context.Context) error {
return c.UploadMetrics(ctx, nil)
}
}))
}

func testCtxErr(factory func(*testing.T) func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
t.Cleanup(innerCancel)
<-innerCtx.Done()

f := factory(t)
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
})

t.Run("Canceled", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()

f := factory(t)
assert.ErrorIs(t, f(innerCtx), context.Canceled)
})
}
}

0 comments on commit 9550f7b

Please sign in to comment.