New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add back the otlpmetrichttp exporter #3097
Merged
Merged
Changes from 37 commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
8c828e6
Add otlpmetric package doc
MrAlias 65e1c94
Add Client interface
MrAlias 5bfc408
Add the Exporter
MrAlias 92aa3b5
Add race detection test for Exporter
MrAlias 023ed33
Expand New godocs
MrAlias cefa7ab
Fix lint
MrAlias 1c37b18
Merge branch 'new_sdk/main' into otlpmetric-exporter
MrAlias 80e7287
Add back the otlpmetrichttp pkg from main
MrAlias 1a03ee1
Restrict to Go 1.18 and above
MrAlias 07b3a70
Remove integration testing
MrAlias 3b8458d
Rename client_unit_test.go to client_test.go
MrAlias fee6cd7
Rename options.go to config.go
MrAlias 28cc143
Remove the NewUnstarted func
MrAlias 35c2e24
Remove Start method from client
MrAlias 18f2ead
Add no-op ForceFlush method to client
MrAlias cdd614e
Update otlpconfig pkg name to oconf
MrAlias 516346b
Rename Stop method to Shutdown
MrAlias 6fdde7e
Update creation functions to compile
MrAlias a889859
Remove name field from client
MrAlias 6295a37
Remove sync of methods from client
MrAlias 0cf6737
Remove unused generalCfg field from client
MrAlias 1b99035
Replace cfg client field with used conf vals
MrAlias 39ed504
Use a http request instead of url/header fields
MrAlias f857949
Remove NewClient and move New into client.go
MrAlias 0e2ad22
Rename client.client field to client.httpClient
MrAlias 6bef8fa
Update client tests
MrAlias 9b0200e
Remove deprecated WithMaxAttempts and WithBackoff
MrAlias 060f304
Update option docs
MrAlias 89c696a
Fix lint
MrAlias 434cdf1
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias 1ae0ea9
Fix lint errors
MrAlias 2df85c5
Revert New to accept a context
MrAlias 8515bd1
Add example test
MrAlias 125c156
Update pkg docs
MrAlias cfe9f69
go mod tidy
MrAlias 96687b2
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias 86bbdc8
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias b300125
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias 2ef5232
Use url.URL to form HTTP request URL
MrAlias 45569fa
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias c53336b
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias 4646d70
Merge branch 'new_sdk/main' into otlpmetric-http
MrAlias File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,286 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
//go:build go1.18 | ||
// +build go1.18 | ||
|
||
package otlpmetrichttp // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" | ||
|
||
import ( | ||
"bytes" | ||
"compress/gzip" | ||
"context" | ||
"fmt" | ||
"io" | ||
"net" | ||
"net/http" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"google.golang.org/protobuf/proto" | ||
|
||
"go.opentelemetry.io/otel/exporters/otlp/internal/retry" | ||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric" | ||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf" | ||
"go.opentelemetry.io/otel/sdk/metric" | ||
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" | ||
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" | ||
) | ||
|
||
// New returns an OpenTelemetry metric Exporter. The Exporter can be used with | ||
// a PeriodicReader to export OpenTelemetry metric data to an OTLP receiving | ||
// endpoint using protobufs over HTTP. | ||
func New(_ context.Context, opts ...Option) (metric.Exporter, error) { | ||
c, err := newClient(opts...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return otlpmetric.New(c), nil | ||
} | ||
|
||
type client struct { | ||
// req is cloned for every upload the client makes. | ||
req *http.Request | ||
compression Compression | ||
requestFunc retry.RequestFunc | ||
httpClient *http.Client | ||
} | ||
|
||
// Keep it in sync with golang's DefaultTransport from net/http! We | ||
// have our own copy to avoid handling a situation where the | ||
// DefaultTransport is overwritten with some different implementation | ||
// of http.RoundTripper or it's modified by another package. | ||
var ourTransport = &http.Transport{ | ||
Proxy: http.ProxyFromEnvironment, | ||
DialContext: (&net.Dialer{ | ||
Timeout: 30 * time.Second, | ||
KeepAlive: 30 * time.Second, | ||
}).DialContext, | ||
ForceAttemptHTTP2: true, | ||
MaxIdleConns: 100, | ||
IdleConnTimeout: 90 * time.Second, | ||
TLSHandshakeTimeout: 10 * time.Second, | ||
ExpectContinueTimeout: 1 * time.Second, | ||
} | ||
|
||
// newClient creates a new HTTP metric client. | ||
func newClient(opts ...Option) (otlpmetric.Client, error) { | ||
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...) | ||
|
||
httpClient := &http.Client{ | ||
Transport: ourTransport, | ||
Timeout: cfg.Metrics.Timeout, | ||
} | ||
if cfg.Metrics.TLSCfg != nil { | ||
transport := ourTransport.Clone() | ||
transport.TLSClientConfig = cfg.Metrics.TLSCfg | ||
httpClient.Transport = transport | ||
} | ||
|
||
format := "https://%s%s" | ||
if cfg.Metrics.Insecure { | ||
format = "http://%s%s" | ||
} | ||
rawURL := fmt.Sprintf(format, cfg.Metrics.Endpoint, cfg.Metrics.URLPath) | ||
MrAlias marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Body is set when this is cloned during upload. | ||
req, err := http.NewRequest(http.MethodPost, rawURL, http.NoBody) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if n := len(cfg.Metrics.Headers); n > 0 { | ||
for k, v := range cfg.Metrics.Headers { | ||
req.Header.Set(k, v) | ||
} | ||
} | ||
req.Header.Set("Content-Type", "application/x-protobuf") | ||
|
||
return &client{ | ||
compression: Compression(cfg.Metrics.Compression), | ||
req: req, | ||
requestFunc: cfg.RetryConfig.RequestFunc(evaluate), | ||
httpClient: httpClient, | ||
}, nil | ||
} | ||
|
||
// ForceFlush does nothing, the client holds no state. | ||
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() } | ||
|
||
// Shutdown shuts down the client, freeing all resources. | ||
func (c *client) Shutdown(ctx context.Context) error { | ||
// The otlpmetric.Exporter synchronizes access to client methods and | ||
// ensures this is called only once. The only thing that needs to be done | ||
// here is to release any computational resources the client holds. | ||
|
||
c.requestFunc = nil | ||
c.httpClient = nil | ||
return ctx.Err() | ||
} | ||
|
||
// UploadMetrics sends protoMetrics to the connected endpoint. | ||
// | ||
// Retryable errors from the server will be handled according to any | ||
// RetryConfig the client was created with. | ||
func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics) error { | ||
// The otlpmetric.Exporter synchronizes access to client methods, and | ||
// ensures this is not called after the Exporter is shutdown. Only thing | ||
// to do here is send data. | ||
|
||
pbRequest := &colmetricpb.ExportMetricsServiceRequest{ | ||
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, | ||
} | ||
body, err := proto.Marshal(pbRequest) | ||
if err != nil { | ||
return err | ||
} | ||
request, err := c.newRequest(ctx, body) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return c.requestFunc(ctx, func(iCtx context.Context) error { | ||
select { | ||
case <-iCtx.Done(): | ||
return iCtx.Err() | ||
default: | ||
} | ||
|
||
request.reset(iCtx) | ||
resp, err := c.httpClient.Do(request.Request) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var rErr error | ||
MrAlias marked this conversation as resolved.
Show resolved
Hide resolved
|
||
switch resp.StatusCode { | ||
case http.StatusOK: | ||
// Success, do not retry. | ||
case http.StatusTooManyRequests, | ||
http.StatusServiceUnavailable: | ||
// Retry-able failure. | ||
rErr = newResponseError(resp.Header) | ||
|
||
// Going to retry, drain the body to reuse the connection. | ||
if _, err := io.Copy(io.Discard, resp.Body); err != nil { | ||
_ = resp.Body.Close() | ||
return err | ||
} | ||
default: | ||
rErr = fmt.Errorf("failed to send metrics to %s: %s", request.URL, resp.Status) | ||
} | ||
|
||
if err := resp.Body.Close(); err != nil { | ||
return err | ||
} | ||
return rErr | ||
}) | ||
} | ||
|
||
var gzPool = sync.Pool{ | ||
New: func() interface{} { | ||
w := gzip.NewWriter(io.Discard) | ||
return w | ||
}, | ||
} | ||
|
||
func (c *client) newRequest(ctx context.Context, body []byte) (request, error) { | ||
r := c.req.Clone(ctx) | ||
req := request{Request: r} | ||
|
||
switch c.compression { | ||
case NoCompression: | ||
r.ContentLength = (int64)(len(body)) | ||
req.bodyReader = bodyReader(body) | ||
case GzipCompression: | ||
// Ensure the content length is not used. | ||
r.ContentLength = -1 | ||
r.Header.Set("Content-Encoding", "gzip") | ||
|
||
gz := gzPool.Get().(*gzip.Writer) | ||
defer gzPool.Put(gz) | ||
|
||
var b bytes.Buffer | ||
gz.Reset(&b) | ||
|
||
if _, err := gz.Write(body); err != nil { | ||
return req, err | ||
} | ||
// Close needs to be called to ensure body if fully written. | ||
if err := gz.Close(); err != nil { | ||
return req, err | ||
} | ||
|
||
req.bodyReader = bodyReader(b.Bytes()) | ||
} | ||
|
||
return req, nil | ||
} | ||
|
||
// bodyReader returns a closure returning a new reader for buf. | ||
func bodyReader(buf []byte) func() io.ReadCloser { | ||
return func() io.ReadCloser { | ||
return io.NopCloser(bytes.NewReader(buf)) | ||
} | ||
} | ||
|
||
// request wraps an http.Request with a resettable body reader. | ||
type request struct { | ||
*http.Request | ||
|
||
// bodyReader allows the same body to be used for multiple requests. | ||
bodyReader func() io.ReadCloser | ||
} | ||
|
||
// reset reinitializes the request Body and uses ctx for the request. | ||
func (r *request) reset(ctx context.Context) { | ||
r.Body = r.bodyReader() | ||
r.Request = r.Request.WithContext(ctx) | ||
} | ||
|
||
// retryableError represents a request failure that can be retried. | ||
type retryableError struct { | ||
throttle int64 | ||
} | ||
|
||
// newResponseError returns a retryableError and will extract any explicit | ||
// throttle delay contained in headers. | ||
func newResponseError(header http.Header) error { | ||
var rErr retryableError | ||
if s, ok := header["Retry-After"]; ok { | ||
if t, err := strconv.ParseInt(s[0], 10, 64); err == nil { | ||
rErr.throttle = t | ||
} | ||
} | ||
return rErr | ||
} | ||
|
||
func (e retryableError) Error() string { | ||
return "retry-able request failure" | ||
} | ||
|
||
// evaluate returns if err is retry-able. If it is and it includes an explicit | ||
// throttling delay, that delay is also returned. | ||
func evaluate(err error) (bool, time.Duration) { | ||
if err == nil { | ||
return false, 0 | ||
} | ||
|
||
rErr, ok := err.(retryableError) | ||
if !ok { | ||
return false, 0 | ||
} | ||
|
||
return true, time.Duration(rErr.throttle) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
//go:build go1.18 | ||
// +build go1.18 | ||
|
||
package otlpmetrichttp | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestClientHonorsContextErrors(t *testing.T) { | ||
t.Run("Shutdown", testCtxErr(func(t *testing.T) func(context.Context) error { | ||
c, err := newClient() | ||
require.NoError(t, err) | ||
return c.Shutdown | ||
})) | ||
|
||
t.Run("ForceFlush", testCtxErr(func(t *testing.T) func(context.Context) error { | ||
c, err := newClient() | ||
require.NoError(t, err) | ||
return c.ForceFlush | ||
})) | ||
|
||
t.Run("UploadMetrics", testCtxErr(func(t *testing.T) func(context.Context) error { | ||
c, err := newClient() | ||
require.NoError(t, err) | ||
return func(ctx context.Context) error { | ||
return c.UploadMetrics(ctx, nil) | ||
} | ||
})) | ||
} | ||
|
||
func testCtxErr(factory func(*testing.T) func(context.Context) error) func(t *testing.T) { | ||
return func(t *testing.T) { | ||
t.Helper() | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
t.Cleanup(cancel) | ||
|
||
t.Run("DeadlineExceeded", func(t *testing.T) { | ||
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond) | ||
t.Cleanup(innerCancel) | ||
<-innerCtx.Done() | ||
|
||
f := factory(t) | ||
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded) | ||
}) | ||
|
||
t.Run("Canceled", func(t *testing.T) { | ||
innerCtx, innerCancel := context.WithCancel(ctx) | ||
innerCancel() | ||
|
||
f := factory(t) | ||
assert.ErrorIs(t, f(innerCtx), context.Canceled) | ||
}) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we aren't using the
context.Context
, do we need it in theNew()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good question. I went both ways on this.
By including it we ...
New
, the trace HTTPNew
, and the trace gRPCNew
However, by including it, we are asking users to provide us something that is not used.
I'm open to opinions here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think passing the context for consistency (and possible future use) makes sense.