Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back the otlpmetrichttp exporter #3097

Merged
merged 42 commits into from Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
8c828e6
Add otlpmetric package doc
MrAlias Aug 12, 2022
65e1c94
Add Client interface
MrAlias Aug 12, 2022
5bfc408
Add the Exporter
MrAlias Aug 12, 2022
92aa3b5
Add race detection test for Exporter
MrAlias Aug 12, 2022
023ed33
Expand New godocs
MrAlias Aug 12, 2022
cefa7ab
Fix lint
MrAlias Aug 12, 2022
1c37b18
Merge branch 'new_sdk/main' into otlpmetric-exporter
MrAlias Aug 15, 2022
80e7287
Add back the otlpmetrichttp pkg from main
MrAlias Aug 16, 2022
1a03ee1
Restrict to Go 1.18 and above
MrAlias Aug 16, 2022
07b3a70
Remove integration testing
MrAlias Aug 16, 2022
3b8458d
Rename client_unit_test.go to client_test.go
MrAlias Aug 16, 2022
fee6cd7
Rename options.go to config.go
MrAlias Aug 16, 2022
28cc143
Remove the NewUnstarted func
MrAlias Aug 16, 2022
35c2e24
Remove Start method from client
MrAlias Aug 16, 2022
18f2ead
Add no-op ForceFlush method to client
MrAlias Aug 16, 2022
cdd614e
Update otlpconfig pkg name to oconf
MrAlias Aug 16, 2022
516346b
Rename Stop method to Shutdown
MrAlias Aug 16, 2022
6fdde7e
Update creation functions to compile
MrAlias Aug 16, 2022
a889859
Remove name field from client
MrAlias Aug 16, 2022
6295a37
Remove sync of methods from client
MrAlias Aug 16, 2022
0cf6737
Remove unused generalCfg field from client
MrAlias Aug 16, 2022
1b99035
Replace cfg client field with used conf vals
MrAlias Aug 16, 2022
39ed504
Use a http request instead of url/header fields
MrAlias Aug 17, 2022
f857949
Remove NewClient and move New into client.go
MrAlias Aug 17, 2022
0e2ad22
Rename client.client field to client.httpClient
MrAlias Aug 17, 2022
6bef8fa
Update client tests
MrAlias Aug 17, 2022
9b0200e
Remove deprecated WithMaxAttempts and WithBackoff
MrAlias Aug 17, 2022
060f304
Update option docs
MrAlias Aug 17, 2022
89c696a
Fix lint
MrAlias Aug 17, 2022
434cdf1
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias Aug 25, 2022
1ae0ea9
Fix lint errors
MrAlias Aug 25, 2022
2df85c5
Revert New to accept a context
MrAlias Aug 25, 2022
8515bd1
Add example test
MrAlias Aug 25, 2022
125c156
Update pkg docs
MrAlias Aug 25, 2022
cfe9f69
go mod tidy
MrAlias Aug 25, 2022
96687b2
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias Aug 30, 2022
86bbdc8
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias Sep 1, 2022
b300125
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias Sep 2, 2022
2ef5232
Use url.URL to form HTTP request URL
MrAlias Sep 2, 2022
45569fa
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias Sep 6, 2022
c53336b
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias Sep 8, 2022
4646d70
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias Sep 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/dependabot.yml
Expand Up @@ -136,6 +136,15 @@ updates:
schedule:
interval: weekly
day: sunday
- package-ecosystem: gomod
directory: /exporters/otlp/otlpmetric/otlpmetrichttp
labels:
- dependencies
- go
- Skip Changelog
schedule:
interval: weekly
day: sunday
- package-ecosystem: gomod
directory: /exporters/otlp/otlptrace
labels:
Expand Down
286 changes: 286 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client.go
@@ -0,0 +1,286 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package otlpmetrichttp // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net"
"net/http"
"strconv"
"sync"
"time"

"google.golang.org/protobuf/proto"

"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

// New returns an OpenTelemetry metric Exporter. The Exporter can be used with
// a PeriodicReader to export OpenTelemetry metric data to an OTLP receiving
// endpoint using protobufs over HTTP.
func New(_ context.Context, opts ...Option) (metric.Exporter, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we aren't using the context.Context, do we need it in the New()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good question. I went both ways on this.

By including it we ...

  • Ensure this is equivalent with the metric gRPC exporter New, the trace HTTP New, and the trace gRPC New
  • Allow us to use a context in the future if we discover a need to

However, by including it, we are asking users to provide us something that is not used.

I'm open to opinions here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think passing the context for consistency (and possible future use) makes sense.

c, err := newClient(opts...)
if err != nil {
return nil, err
}
return otlpmetric.New(c), nil
}

type client struct {
// req is cloned for every upload the client makes.
req *http.Request
compression Compression
requestFunc retry.RequestFunc
httpClient *http.Client
}

// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
// DefaultTransport is overwritten with some different implementation
// of http.RoundTripper or it's modified by another package.
var ourTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

// newClient creates a new HTTP metric client.
func newClient(opts ...Option) (otlpmetric.Client, error) {
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)

httpClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.Metrics.Timeout,
}
if cfg.Metrics.TLSCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.Metrics.TLSCfg
httpClient.Transport = transport
}

format := "https://%s%s"
if cfg.Metrics.Insecure {
format = "http://%s%s"
}
rawURL := fmt.Sprintf(format, cfg.Metrics.Endpoint, cfg.Metrics.URLPath)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
// Body is set when this is cloned during upload.
req, err := http.NewRequest(http.MethodPost, rawURL, http.NoBody)
if err != nil {
return nil, err
}

if n := len(cfg.Metrics.Headers); n > 0 {
for k, v := range cfg.Metrics.Headers {
req.Header.Set(k, v)
}
}
req.Header.Set("Content-Type", "application/x-protobuf")

return &client{
compression: Compression(cfg.Metrics.Compression),
req: req,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
httpClient: httpClient,
}, nil
}

// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }

// Shutdown shuts down the client, freeing all resources.
func (c *client) Shutdown(ctx context.Context) error {
// The otlpmetric.Exporter synchronizes access to client methods and
// ensures this is called only once. The only thing that needs to be done
// here is to release any computational resources the client holds.

c.requestFunc = nil
c.httpClient = nil
return ctx.Err()
}

// UploadMetrics sends protoMetrics to the connected endpoint.
//
// Retryable errors from the server will be handled according to any
// RetryConfig the client was created with.
func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics) error {
// The otlpmetric.Exporter synchronizes access to client methods, and
// ensures this is not called after the Exporter is shutdown. Only thing
// to do here is send data.

pbRequest := &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
}
body, err := proto.Marshal(pbRequest)
if err != nil {
return err
}
request, err := c.newRequest(ctx, body)
if err != nil {
return err
}

return c.requestFunc(ctx, func(iCtx context.Context) error {
select {
case <-iCtx.Done():
return iCtx.Err()
default:
}

request.reset(iCtx)
resp, err := c.httpClient.Do(request.Request)
if err != nil {
return err
}

var rErr error
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
switch resp.StatusCode {
case http.StatusOK:
// Success, do not retry.
case http.StatusTooManyRequests,
http.StatusServiceUnavailable:
// Retry-able failure.
rErr = newResponseError(resp.Header)

// Going to retry, drain the body to reuse the connection.
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
_ = resp.Body.Close()
return err
}
default:
rErr = fmt.Errorf("failed to send metrics to %s: %s", request.URL, resp.Status)
}

if err := resp.Body.Close(); err != nil {
return err
}
return rErr
})
}

var gzPool = sync.Pool{
New: func() interface{} {
w := gzip.NewWriter(io.Discard)
return w
},
}

func (c *client) newRequest(ctx context.Context, body []byte) (request, error) {
r := c.req.Clone(ctx)
req := request{Request: r}

switch c.compression {
case NoCompression:
r.ContentLength = (int64)(len(body))
req.bodyReader = bodyReader(body)
case GzipCompression:
// Ensure the content length is not used.
r.ContentLength = -1
r.Header.Set("Content-Encoding", "gzip")

gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)

var b bytes.Buffer
gz.Reset(&b)

if _, err := gz.Write(body); err != nil {
return req, err
}
// Close needs to be called to ensure body if fully written.
if err := gz.Close(); err != nil {
return req, err
}

req.bodyReader = bodyReader(b.Bytes())
}

return req, nil
}

// bodyReader returns a closure returning a new reader for buf.
func bodyReader(buf []byte) func() io.ReadCloser {
return func() io.ReadCloser {
return io.NopCloser(bytes.NewReader(buf))
}
}

// request wraps an http.Request with a resettable body reader.
type request struct {
*http.Request

// bodyReader allows the same body to be used for multiple requests.
bodyReader func() io.ReadCloser
}

// reset reinitializes the request Body and uses ctx for the request.
func (r *request) reset(ctx context.Context) {
r.Body = r.bodyReader()
r.Request = r.Request.WithContext(ctx)
}

// retryableError represents a request failure that can be retried.
type retryableError struct {
throttle int64
}

// newResponseError returns a retryableError and will extract any explicit
// throttle delay contained in headers.
func newResponseError(header http.Header) error {
var rErr retryableError
if s, ok := header["Retry-After"]; ok {
if t, err := strconv.ParseInt(s[0], 10, 64); err == nil {
rErr.throttle = t
}
}
return rErr
}

func (e retryableError) Error() string {
return "retry-able request failure"
}

// evaluate returns if err is retry-able. If it is and it includes an explicit
// throttling delay, that delay is also returned.
func evaluate(err error) (bool, time.Duration) {
if err == nil {
return false, 0
}

rErr, ok := err.(retryableError)
if !ok {
return false, 0
}

return true, time.Duration(rErr.throttle)
}
74 changes: 74 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package otlpmetrichttp

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClientHonorsContextErrors(t *testing.T) {
t.Run("Shutdown", testCtxErr(func(t *testing.T) func(context.Context) error {
c, err := newClient()
require.NoError(t, err)
return c.Shutdown
}))

t.Run("ForceFlush", testCtxErr(func(t *testing.T) func(context.Context) error {
c, err := newClient()
require.NoError(t, err)
return c.ForceFlush
}))

t.Run("UploadMetrics", testCtxErr(func(t *testing.T) func(context.Context) error {
c, err := newClient()
require.NoError(t, err)
return func(ctx context.Context) error {
return c.UploadMetrics(ctx, nil)
}
}))
}

func testCtxErr(factory func(*testing.T) func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
t.Cleanup(innerCancel)
<-innerCtx.Done()

f := factory(t)
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
})

t.Run("Canceled", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()

f := factory(t)
assert.ErrorIs(t, f(innerCtx), context.Canceled)
})
}
}