From f4b63f184d298d924f9ad630a24e51fc136bdac8 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 3 Dec 2021 13:59:07 -0800 Subject: [PATCH] Use gRPC ClientConn for otlpmetricgrpc client connection handling (#2425) * Use gRPC ClientConn for otlpmetricgrpc conn handling * Update PR number * Update otlpmetrichttp to use retry package * Remove old commented code * Add all other external changes to changelog --- CHANGELOG.md | 8 + exporters/otlp/otlpmetric/go.mod | 1 - .../internal/connection/alignment_test.go | 38 -- .../internal/connection/connection.go | 431 ------------------ .../internal/connection/connection_test.go | 89 ---- .../otlpmetric/internal/otlpconfig/options.go | 71 +-- .../internal/otlpmetrictest/client.go | 16 + .../otlp/otlpmetric/internal/retry/retry.go | 137 ++++++ .../otlpmetric/internal/retry/retry_test.go | 197 ++++++++ .../otlp/otlpmetric/otlpmetricgrpc/client.go | 266 +++++++++-- .../otlpmetric/otlpmetricgrpc/client_test.go | 425 +---------------- .../otlpmetricgrpc/client_unit_test.go | 193 ++++++++ .../otlpmetricgrpc/mock_collector_test.go | 62 +-- .../otlp/otlpmetric/otlpmetricgrpc/options.go | 118 +++-- .../otlp/otlpmetric/otlpmetrichttp/client.go | 277 ++++++----- .../otlpmetric/otlpmetrichttp/client_test.go | 161 ------- .../otlpmetrichttp/client_unit_test.go | 68 +++ .../otlp/otlpmetric/otlpmetrichttp/go.mod | 1 - .../otlp/otlpmetric/otlpmetrichttp/go.sum | 1 + .../otlp/otlpmetric/otlpmetrichttp/options.go | 83 +++- 20 files changed, 1204 insertions(+), 1439 deletions(-) delete mode 100644 exporters/otlp/otlpmetric/internal/connection/alignment_test.go delete mode 100644 exporters/otlp/otlpmetric/internal/connection/connection.go delete mode 100644 exporters/otlp/otlpmetric/internal/connection/connection_test.go create mode 100644 exporters/otlp/otlpmetric/internal/retry/retry.go create mode 100644 exporters/otlp/otlpmetric/internal/retry/retry_test.go create mode 100644 exporters/otlp/otlpmetric/otlpmetricgrpc/client_unit_test.go create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/client_unit_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 78799db5775..327bc3d001d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,11 +13,19 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Added an internal Logger. This can be used by the SDK and API to provide users with feedback of the internal state. To enable verbose logs configure the logger which will print V(1) logs. For debugging information configure to print V(5) logs. (#2343) +- Add the `WithRetry` `Option` and the `RetryConfig` type to the `go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetrichttp` package to specify retry behavior consistently. (#2425) ### Changed - The `"go.opentelemetry.io/otel/exporter/otel/otlptrace/otlptracegrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#2329) - Changed the project minimum supported Go version from 1.15 to 1.16. (#2412) +- The `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetricgrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#2425) +- The `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetricgrpc".RetrySettings` type is renamed to `RetryConfig`. (#2425) + +### Deprecated + +- Deprecated the `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetrichttp".WithMaxAttempts` `Option`, use the new `WithRetry` `Option` instead. (#2425) +- Deprecated the `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetrichttp".WithBackoff` `Option`, use the new `WithRetry` `Option` instead. (#2425) ### Removed diff --git a/exporters/otlp/otlpmetric/go.mod b/exporters/otlp/otlpmetric/go.mod index bebf80f77f3..287d571fac5 100644 --- a/exporters/otlp/otlpmetric/go.mod +++ b/exporters/otlp/otlpmetric/go.mod @@ -12,7 +12,6 @@ require ( go.opentelemetry.io/otel/sdk/export/metric v0.25.0 go.opentelemetry.io/otel/sdk/metric v0.25.0 go.opentelemetry.io/proto/otlp v0.11.0 - google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.42.0 google.golang.org/protobuf v1.27.1 ) diff --git a/exporters/otlp/otlpmetric/internal/connection/alignment_test.go b/exporters/otlp/otlpmetric/internal/connection/alignment_test.go deleted file mode 100644 index aad85902c28..00000000000 --- a/exporters/otlp/otlpmetric/internal/connection/alignment_test.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection - -import ( - "os" - "testing" - "unsafe" - - ottest "go.opentelemetry.io/otel/internal/internaltest" -) - -// Ensure struct alignment prior to running tests. -func TestMain(m *testing.M) { - fields := []ottest.FieldOffset{ - { - Name: "Connection.lastConnectErrPtr", - Offset: unsafe.Offsetof(Connection{}.lastConnectErrPtr), - }, - } - if !ottest.Aligned8Byte(fields, os.Stderr) { - os.Exit(1) - } - - os.Exit(m.Run()) -} diff --git a/exporters/otlp/otlpmetric/internal/connection/connection.go b/exporters/otlp/otlpmetric/internal/connection/connection.go deleted file mode 100644 index dcb6db403fb..00000000000 --- a/exporters/otlp/otlpmetric/internal/connection/connection.go +++ /dev/null @@ -1,431 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/connection" - -import ( - "context" - "fmt" - "math/rand" - "sync" - "sync/atomic" - "time" - "unsafe" - - "github.com/cenkalti/backoff/v4" - "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" -) - -type Connection struct { - // Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines. - lastConnectErrPtr unsafe.Pointer - - // mu protects the Connection as it is accessed by the - // exporter goroutines and background Connection goroutine - mu sync.Mutex - cc *grpc.ClientConn - - // these fields are read-only after constructor is finished - cfg otlpconfig.Config - SCfg otlpconfig.SignalConfig - metadata metadata.MD - newConnectionHandler func(cc *grpc.ClientConn) - - // these channels are created once - disconnectedCh chan bool - backgroundConnectionDoneCh chan struct{} - stopCh chan struct{} - - // this is for tests, so they can replace the closing - // routine without a worry of modifying some global variable - // or changing it back to original after the test is done - closeBackgroundConnectionDoneCh func(ch chan struct{}) -} - -func NewConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler func(cc *grpc.ClientConn)) *Connection { - c := new(Connection) - c.newConnectionHandler = handler - c.cfg = cfg - c.SCfg = sCfg - if len(c.SCfg.Headers) > 0 { - c.metadata = metadata.New(c.SCfg.Headers) - } - c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { - close(ch) - } - return c -} - -func (c *Connection) StartConnection(ctx context.Context) error { - c.stopCh = make(chan struct{}) - c.disconnectedCh = make(chan bool, 1) - c.backgroundConnectionDoneCh = make(chan struct{}) - - if err := c.connect(ctx); err == nil { - c.setStateConnected() - } else { - c.SetStateDisconnected(err) - } - go c.indefiniteBackgroundConnection() - - // TODO: proper error handling when initializing connections. - // We can report permanent errors, e.g., invalid settings. - return nil -} - -func (c *Connection) LastConnectError() error { - errPtr := (*error)(atomic.LoadPointer(&c.lastConnectErrPtr)) - if errPtr == nil { - return nil - } - return *errPtr -} - -func (c *Connection) saveLastConnectError(err error) { - var errPtr *error - if err != nil { - errPtr = &err - } - atomic.StorePointer(&c.lastConnectErrPtr, unsafe.Pointer(errPtr)) -} - -func (c *Connection) SetStateDisconnected(err error) { - c.saveLastConnectError(err) - select { - case c.disconnectedCh <- true: - default: - } - c.newConnectionHandler(nil) -} - -func (c *Connection) setStateConnected() { - c.saveLastConnectError(nil) -} - -func (c *Connection) Connected() bool { - return c.LastConnectError() == nil -} - -const defaultConnReattemptPeriod = 10 * time.Second - -func (c *Connection) indefiniteBackgroundConnection() { - defer func() { - c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh) - }() - - connReattemptPeriod := c.cfg.ReconnectionPeriod - if connReattemptPeriod <= 0 { - connReattemptPeriod = defaultConnReattemptPeriod - } - - // No strong seeding required, nano time can - // already help with pseudo uniqueness. - rng := rand.New(rand.NewSource(time.Now().UnixNano() + rand.Int63n(1024))) - - // maxJitterNanos: 70% of the connectionReattemptPeriod - maxJitterNanos := int64(0.7 * float64(connReattemptPeriod)) - - for { - // Otherwise these will be the normal scenarios to enable - // reconnection if we trip out. - // 1. If we've stopped, return entirely - // 2. Otherwise block until we are disconnected, and - // then retry connecting - select { - case <-c.stopCh: - return - - case <-c.disconnectedCh: - // Quickly check if we haven't stopped at the - // same time. - select { - case <-c.stopCh: - return - - default: - } - - // Normal scenario that we'll wait for - } - - if err := c.connect(context.Background()); err == nil { - c.setStateConnected() - } else { - // this code is unreachable in most cases - // c.connect does not establish Connection - c.SetStateDisconnected(err) - } - - // Apply some jitter to avoid lockstep retrials of other - // collector-exporters. Lockstep retrials could result in an - // innocent DDOS, by clogging the machine's resources and network. - jitter := time.Duration(rng.Int63n(maxJitterNanos)) - select { - case <-c.stopCh: - return - case <-time.After(connReattemptPeriod + jitter): - } - } -} - -func (c *Connection) connect(ctx context.Context) error { - cc, err := c.dialToCollector(ctx) - if err != nil { - return err - } - c.setConnection(cc) - c.newConnectionHandler(cc) - return nil -} - -// setConnection sets cc as the client Connection and returns true if -// the Connection state changed. -func (c *Connection) setConnection(cc *grpc.ClientConn) bool { - c.mu.Lock() - defer c.mu.Unlock() - - // If previous clientConn is same as the current then just return. - // This doesn't happen right now as this func is only called with new ClientConn. - // It is more about future-proofing. - if c.cc == cc { - return false - } - - // If the previous clientConn was non-nil, close it - if c.cc != nil { - _ = c.cc.Close() - } - c.cc = cc - return true -} - -func (c *Connection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { - if c.cfg.GRPCConn != nil { - return c.cfg.GRPCConn, nil - } - - dialOpts := []grpc.DialOption{} - if c.cfg.ServiceConfig != "" { - dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.ServiceConfig)) - } - if c.SCfg.GRPCCredentials != nil { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.SCfg.GRPCCredentials)) - } else if c.SCfg.Insecure { - dialOpts = append(dialOpts, grpc.WithInsecure()) - } - if c.SCfg.Compression == otlpconfig.GzipCompression { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) - } - if len(c.cfg.DialOptions) != 0 { - dialOpts = append(dialOpts, c.cfg.DialOptions...) - } - - ctx, cancel := c.ContextWithStop(ctx) - defer cancel() - ctx = c.ContextWithMetadata(ctx) - return grpc.DialContext(ctx, c.SCfg.Endpoint, dialOpts...) -} - -func (c *Connection) ContextWithMetadata(ctx context.Context) context.Context { - if c.metadata.Len() > 0 { - return metadata.NewOutgoingContext(ctx, c.metadata) - } - return ctx -} - -func (c *Connection) Shutdown(ctx context.Context) error { - close(c.stopCh) - // Ensure that the backgroundConnector returns - select { - case <-c.backgroundConnectionDoneCh: - case <-ctx.Done(): - return ctx.Err() - } - - c.mu.Lock() - cc := c.cc - c.cc = nil - c.mu.Unlock() - - if cc != nil { - return cc.Close() - } - - return nil -} - -func (c *Connection) ContextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { - // Unify the parent context Done signal with the Connection's - // stop channel. - ctx, cancel := context.WithCancel(ctx) - go func(ctx context.Context, cancel context.CancelFunc) { - select { - case <-ctx.Done(): - // Nothing to do, either cancelled or deadline - // happened. - case <-c.stopCh: - cancel() - } - }(ctx, cancel) - return ctx, cancel -} - -func (c *Connection) DoRequest(ctx context.Context, fn func(context.Context) error) error { - expBackoff := newExponentialBackoff(c.cfg.RetrySettings) - - for { - err := fn(ctx) - if err == nil { - // request succeeded. - return nil - } - - if !c.cfg.RetrySettings.Enabled { - return err - } - - // We have an error, check gRPC status code. - st := status.Convert(err) - if st.Code() == codes.OK { - // Not really an error, still success. - return nil - } - - // Now, this is this a real error. - - if !shouldRetry(st.Code()) { - // It is not a retryable error, we should not retry. - return err - } - - // Need to retry. - - throttle := getThrottleDuration(st) - - backoffDelay := expBackoff.NextBackOff() - if backoffDelay == backoff.Stop { - // throw away the batch - err = fmt.Errorf("max elapsed time expired: %w", err) - return err - } - - var delay time.Duration - - if backoffDelay > throttle { - delay = backoffDelay - } else { - if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime { - err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err) - return err - } - - // Respect server throttling. - delay = throttle - } - - // back-off, but get interrupted when shutting down or request is cancelled or timed out. - err = func() error { - dt := time.NewTimer(delay) - defer dt.Stop() - - select { - case <-ctx.Done(): - return ctx.Err() - case <-c.stopCh: - return fmt.Errorf("interrupted due to shutdown: %w", err) - case <-dt.C: - } - - return nil - }() - - if err != nil { - return err - } - - } -} - -func shouldRetry(code codes.Code) bool { - switch code { - case codes.OK: - // Success. This function should not be called for this code, the best we - // can do is tell the caller not to retry. - return false - - case codes.Canceled, - codes.DeadlineExceeded, - codes.ResourceExhausted, - codes.Aborted, - codes.OutOfRange, - codes.Unavailable, - codes.DataLoss: - // These are retryable errors. - return true - - case codes.Unknown, - codes.InvalidArgument, - codes.Unauthenticated, - codes.PermissionDenied, - codes.NotFound, - codes.AlreadyExists, - codes.FailedPrecondition, - codes.Unimplemented, - codes.Internal: - // These are fatal errors, don't retry. - return false - - default: - // Don't retry on unknown codes. - return false - } -} - -func getThrottleDuration(status *status.Status) time.Duration { - // See if throttling information is available. - for _, detail := range status.Details() { - if t, ok := detail.(*errdetails.RetryInfo); ok { - if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 { - // We are throttled. Wait before retrying as requested by the server. - return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond - } - return 0 - } - } - return 0 -} - -func newExponentialBackoff(rs otlpconfig.RetrySettings) *backoff.ExponentialBackOff { - // Do not use NewExponentialBackOff since it calls Reset and the code here must - // call Reset after changing the InitialInterval (this saves an unnecessary call to Now). - expBackoff := &backoff.ExponentialBackOff{ - InitialInterval: rs.InitialInterval, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, - MaxInterval: rs.MaxInterval, - MaxElapsedTime: rs.MaxElapsedTime, - Stop: backoff.Stop, - Clock: backoff.SystemClock, - } - expBackoff.Reset() - - return expBackoff -} diff --git a/exporters/otlp/otlpmetric/internal/connection/connection_test.go b/exporters/otlp/otlpmetric/internal/connection/connection_test.go deleted file mode 100644 index f842fbd48e7..00000000000 --- a/exporters/otlp/otlpmetric/internal/connection/connection_test.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" -) - -func TestGetThrottleDuration(t *testing.T) { - tts := []struct { - stsFn func() (*status.Status, error) - throttle time.Duration - }{ - { - stsFn: func() (*status.Status, error) { - return status.New( - codes.OK, - "status with no retry info", - ), nil - }, - throttle: 0, - }, - { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with retry info") - return st.WithDetails( - &errdetails.RetryInfo{RetryDelay: durationpb.New(15 * time.Millisecond)}, - ) - }, - throttle: 15 * time.Millisecond, - }, - { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with error info detail") - return st.WithDetails( - &errdetails.ErrorInfo{Reason: "no throttle detail"}, - ) - }, - throttle: 0, - }, - { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with error info and retry info") - return st.WithDetails( - &errdetails.ErrorInfo{Reason: "no throttle detail"}, - &errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)}, - ) - }, - throttle: 13 * time.Minute, - }, - { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with two retry info should take the first") - return st.WithDetails( - &errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)}, - &errdetails.RetryInfo{RetryDelay: durationpb.New(18 * time.Minute)}, - ) - }, - throttle: 13 * time.Minute, - }, - } - - for _, tt := range tts { - sts, _ := tt.stsFn() - t.Run(sts.Message(), func(t *testing.T) { - th := getThrottleDuration(sts) - require.Equal(t, tt.throttle, th) - }) - } -} diff --git a/exporters/otlp/otlpmetric/internal/otlpconfig/options.go b/exporters/otlp/otlpmetric/internal/otlpconfig/options.go index 4a9c326f412..485555364b9 100644 --- a/exporters/otlp/otlpmetric/internal/otlpconfig/options.go +++ b/exporters/otlp/otlpmetric/internal/otlpconfig/options.go @@ -20,7 +20,11 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/encoding/gzip" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" ) const ( @@ -39,16 +43,6 @@ const ( DefaultTimeout time.Duration = 10 * time.Second ) -var ( - // defaultRetrySettings is a default settings for the retry policy. - defaultRetrySettings = RetrySettings{ - Enabled: true, - InitialInterval: 5 * time.Second, - MaxInterval: 30 * time.Second, - MaxElapsedTime: time.Minute, - } -) - type ( SignalConfig struct { Endpoint string @@ -67,16 +61,13 @@ type ( // Signal specific configurations Metrics SignalConfig - // HTTP configurations - MaxAttempts int - Backoff time.Duration + RetryConfig retry.Config // gRPC configurations ReconnectionPeriod time.Duration ServiceConfig string DialOptions []grpc.DialOption GRPCConn *grpc.ClientConn - RetrySettings RetrySettings } ) @@ -88,12 +79,46 @@ func NewDefaultConfig() Config { Compression: NoCompression, Timeout: DefaultTimeout, }, - RetrySettings: defaultRetrySettings, + RetryConfig: retry.DefaultConfig, } return c } +// NewGRPCConfig returns a new Config with all settings applied from opts and +// any unset setting using the default gRPC config values. +func NewGRPCConfig(opts ...GRPCOption) Config { + cfg := NewDefaultConfig() + ApplyGRPCEnvConfigs(&cfg) + for _, opt := range opts { + opt.ApplyGRPCOption(&cfg) + } + + if cfg.ServiceConfig != "" { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultServiceConfig(cfg.ServiceConfig)) + } + if cfg.Metrics.GRPCCredentials != nil { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(cfg.Metrics.GRPCCredentials)) + } else if cfg.Metrics.Insecure { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithInsecure()) + } + if cfg.Metrics.Compression == GzipCompression { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) + } + if len(cfg.DialOptions) != 0 { + cfg.DialOptions = append(cfg.DialOptions, cfg.DialOptions...) + } + if cfg.ReconnectionPeriod != 0 { + p := grpc.ConnectParams{ + Backoff: backoff.DefaultConfig, + MinConnectTimeout: cfg.ReconnectionPeriod, + } + cfg.DialOptions = append(cfg.DialOptions, grpc.WithConnectParams(p)) + } + + return cfg +} + type ( // GenericOption applies an option to the HTTP or gRPC driver. GenericOption interface { @@ -218,9 +243,9 @@ func WithURLPath(urlPath string) GenericOption { }) } -func WithRetry(settings RetrySettings) GenericOption { +func WithRetry(rc retry.Config) GenericOption { return newGenericOption(func(cfg *Config) { - cfg.RetrySettings = settings + cfg.RetryConfig = rc }) } @@ -255,15 +280,3 @@ func WithTimeout(duration time.Duration) GenericOption { cfg.Metrics.Timeout = duration }) } - -func WithMaxAttempts(maxAttempts int) GenericOption { - return newGenericOption(func(cfg *Config) { - cfg.MaxAttempts = maxAttempts - }) -} - -func WithBackoff(duration time.Duration) GenericOption { - return newGenericOption(func(cfg *Config) { - cfg.Backoff = duration - }) -} diff --git a/exporters/otlp/otlpmetric/internal/otlpmetrictest/client.go b/exporters/otlp/otlpmetric/internal/otlpmetrictest/client.go index fb29e3ede90..c248521ee14 100644 --- a/exporters/otlp/otlpmetric/internal/otlpmetrictest/client.go +++ b/exporters/otlp/otlpmetric/internal/otlpmetrictest/client.go @@ -55,6 +55,14 @@ func initializeExporter(t *testing.T, client otlpmetric.Client) *otlpmetric.Expo } func testClientStopHonorsTimeout(t *testing.T, client otlpmetric.Client) { + t.Cleanup(func() { + // The test is looking for a failed shut down. Call Stop a second time + // with an un-expired context to give the client a second chance at + // cleaning up. There is not guarantee from the Client interface this + // will succeed, therefore, no need to check the error (just give it a + // best try). + _ = client.Stop(context.Background()) + }) e := initializeExporter(t, client) innerCtx, innerCancel := context.WithTimeout(context.Background(), time.Microsecond) @@ -68,6 +76,14 @@ func testClientStopHonorsTimeout(t *testing.T, client otlpmetric.Client) { } func testClientStopHonorsCancel(t *testing.T, client otlpmetric.Client) { + t.Cleanup(func() { + // The test is looking for a failed shut down. Call Stop a second time + // with an un-expired context to give the client a second chance at + // cleaning up. There is not guarantee from the Client interface this + // will succeed, therefore, no need to check the error (just give it a + // best try). + _ = client.Stop(context.Background()) + }) e := initializeExporter(t, client) ctx, innerCancel := context.WithCancel(context.Background()) diff --git a/exporters/otlp/otlpmetric/internal/retry/retry.go b/exporters/otlp/otlpmetric/internal/retry/retry.go new file mode 100644 index 00000000000..830208f82f3 --- /dev/null +++ b/exporters/otlp/otlpmetric/internal/retry/retry.go @@ -0,0 +1,137 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" +) + +// DefaultConfig are the recommended defaults to use. +var DefaultConfig = Config{ + Enabled: true, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: time.Minute, +} + +// Config defines configuration for retrying batches in case of export failure +// using an exponential backoff. +type Config struct { + // Enabled indicates whether to not retry sending batches in case of + // export failure. + Enabled bool + // InitialInterval the time to wait after the first failure before + // retrying. + InitialInterval time.Duration + // MaxInterval is the upper bound on backoff interval. Once this value is + // reached the delay between consecutive retries will always be + // `MaxInterval`. + MaxInterval time.Duration + // MaxElapsedTime is the maximum amount of time (including retries) spent + // trying to send a request/batch. Once this value is reached, the data + // is discarded. + MaxElapsedTime time.Duration +} + +// RequestFunc wraps a request with retry logic. +type RequestFunc func(context.Context, func(context.Context) error) error + +// EvaluateFunc returns if an error is retry-able and if an explicit throttle +// duration should be honored that was included in the error. +type EvaluateFunc func(error) (bool, time.Duration) + +func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc { + if !c.Enabled { + return func(ctx context.Context, fn func(context.Context) error) error { + return fn(ctx) + } + } + + // Do not use NewExponentialBackOff since it calls Reset and the code here + // must call Reset after changing the InitialInterval (this saves an + // unnecessary call to Now). + b := &backoff.ExponentialBackOff{ + InitialInterval: c.InitialInterval, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: c.MaxInterval, + MaxElapsedTime: c.MaxElapsedTime, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + b.Reset() + + return func(ctx context.Context, fn func(context.Context) error) error { + for { + err := fn(ctx) + if err == nil { + return nil + } + + retryable, throttle := evaluate(err) + if !retryable { + return err + } + + bOff := b.NextBackOff() + if bOff == backoff.Stop { + return fmt.Errorf("max retry time elapsed: %w", err) + } + + // Wait for the greater of the backoff or throttle delay. + var delay time.Duration + if bOff > throttle { + delay = bOff + } else { + elapsed := b.GetElapsedTime() + if b.MaxElapsedTime != 0 && elapsed+throttle > b.MaxElapsedTime { + return fmt.Errorf("max retry time would elapse: %w", err) + } + delay = throttle + } + + if err := waitFunc(ctx, delay); err != nil { + return err + } + } + } +} + +// Allow override for testing. +var waitFunc = wait + +func wait(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + // Handle the case where the timer and context deadline end + // simultaneously by prioritizing the timer expiration nil value + // response. + select { + case <-timer.C: + default: + return ctx.Err() + } + case <-timer.C: + } + + return nil +} diff --git a/exporters/otlp/otlpmetric/internal/retry/retry_test.go b/exporters/otlp/otlpmetric/internal/retry/retry_test.go new file mode 100644 index 00000000000..d2b5b6c4b59 --- /dev/null +++ b/exporters/otlp/otlpmetric/internal/retry/retry_test.go @@ -0,0 +1,197 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWait(t *testing.T) { + tests := []struct { + ctx context.Context + delay time.Duration + expected error + }{ + { + ctx: context.Background(), + delay: time.Duration(0), + expected: nil, + }, + { + ctx: context.Background(), + delay: time.Duration(1), + expected: nil, + }, + { + ctx: context.Background(), + delay: time.Duration(-1), + expected: nil, + }, + { + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }(), + // Ensure the timer and context do not end simultaneously. + delay: 1 * time.Hour, + expected: context.Canceled, + }, + } + + for _, test := range tests { + assert.Equal(t, test.expected, wait(test.ctx, test.delay)) + } +} + +func TestNonRetryableError(t *testing.T) { + ev := func(error) (bool, time.Duration) { return false, 0 } + + reqFunc := Config{ + Enabled: true, + InitialInterval: 1 * time.Nanosecond, + MaxInterval: 1 * time.Nanosecond, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + ctx := context.Background() + assert.NoError(t, reqFunc(ctx, func(context.Context) error { + return nil + })) + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }), assert.AnError) +} + +func TestThrottledRetry(t *testing.T) { + // Ensure the throttle delay is used by making longer than backoff delay. + throttleDelay, backoffDelay := time.Second, time.Nanosecond + + ev := func(error) (bool, time.Duration) { + // Retry everything with a throttle delay. + return true, throttleDelay + } + + reqFunc := Config{ + Enabled: true, + InitialInterval: backoffDelay, + MaxInterval: backoffDelay, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + + origWait := waitFunc + var done bool + waitFunc = func(_ context.Context, delay time.Duration) error { + assert.Equal(t, throttleDelay, delay, "retry not throttled") + // Try twice to ensure call is attempted again after delay. + if done { + return assert.AnError + } + done = true + return nil + } + defer func() { waitFunc = origWait }() + + ctx := context.Background() + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return errors.New("not this error") + }), assert.AnError) +} + +func TestBackoffRetry(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + + delay := time.Nanosecond + reqFunc := Config{ + Enabled: true, + InitialInterval: delay, + MaxInterval: delay, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + + origWait := waitFunc + var done bool + waitFunc = func(_ context.Context, d time.Duration) error { + assert.Equal(t, delay, d, "retry not backoffed") + // Try twice to ensure call is attempted again after delay. + if done { + return assert.AnError + } + done = true + return nil + } + defer func() { waitFunc = origWait }() + + ctx := context.Background() + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return errors.New("not this error") + }), assert.AnError) +} + +func TestThrottledRetryGreaterThanMaxElapsedTime(t *testing.T) { + // Ensure the throttle delay is used by making longer than backoff delay. + tDelay, bDelay := time.Hour, time.Nanosecond + ev := func(error) (bool, time.Duration) { return true, tDelay } + reqFunc := Config{ + Enabled: true, + InitialInterval: bDelay, + MaxInterval: bDelay, + MaxElapsedTime: tDelay - (time.Nanosecond), + }.RequestFunc(ev) + + ctx := context.Background() + assert.Contains(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }).Error(), "max retry time would elapse: ") +} + +func TestMaxElapsedTime(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + delay := time.Nanosecond + reqFunc := Config{ + Enabled: true, + // InitialInterval > MaxElapsedTime means immediate return. + InitialInterval: 2 * delay, + MaxElapsedTime: delay, + }.RequestFunc(ev) + + ctx := context.Background() + assert.Contains(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }).Error(), "max retry time elapsed: ") +} + +func TestRetryNotEnabled(t *testing.T) { + ev := func(error) (bool, time.Duration) { + t.Error("evaluated retry when not enabled") + return false, 0 + } + + reqFunc := Config{}.RequestFunc(ev) + ctx := context.Background() + assert.NoError(t, reqFunc(ctx, func(context.Context) error { + return nil + })) + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }), assert.AnError) +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index c70be399fc7..9192930e22a 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -17,91 +17,259 @@ package otlpmetricgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpme import ( "context" "errors" - "fmt" "sync" + "time" + "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/connection" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) type client struct { - connection *connection.Connection + endpoint string + dialOpts []grpc.DialOption + metadata metadata.MD + exportTimeout time.Duration + requestFunc retry.RequestFunc - lock sync.Mutex - metricsClient colmetricpb.MetricsServiceClient + // stopCtx is used as a parent context for all exports. Therefore, when it + // is canceled with the stopFunc all exports are canceled. + stopCtx context.Context + // stopFunc cancels stopCtx, stopping any active exports. + stopFunc context.CancelFunc + + // ourConn keeps track of where conn was created: true if created here on + // Start, or false if passed with an option. This is important on Shutdown + // as the conn should only be closed if created here on start. Otherwise, + // it is up to the processes that passed the conn to close it. + ourConn bool + conn *grpc.ClientConn + mscMu sync.RWMutex + msc colmetricpb.MetricsServiceClient } -var ( - errNoClient = errors.New("no client") -) +// Compile time check *client implements otlpmetric.Client. +var _ otlpmetric.Client = (*client)(nil) // NewClient creates a new gRPC metric client. func NewClient(opts ...Option) otlpmetric.Client { - cfg := otlpconfig.NewDefaultConfig() - otlpconfig.ApplyGRPCEnvConfigs(&cfg) - for _, opt := range opts { - opt.applyGRPCOption(&cfg) + return newClient(opts...) +} + +func newClient(opts ...Option) *client { + cfg := otlpconfig.NewGRPCConfig(asGRPCOptions(opts)...) + + ctx, cancel := context.WithCancel(context.Background()) + + c := &client{ + endpoint: cfg.Metrics.Endpoint, + exportTimeout: cfg.Metrics.Timeout, + requestFunc: cfg.RetryConfig.RequestFunc(retryable), + dialOpts: cfg.DialOptions, + stopCtx: ctx, + stopFunc: cancel, + conn: cfg.GRPCConn, } - c := &client{} - c.connection = connection.NewConnection(cfg, cfg.Metrics, c.handleNewConnection) + if len(cfg.Metrics.Headers) > 0 { + c.metadata = metadata.New(cfg.Metrics.Headers) + } return c } -func (c *client) handleNewConnection(cc *grpc.ClientConn) { - c.lock.Lock() - defer c.lock.Unlock() - if cc != nil { - c.metricsClient = colmetricpb.NewMetricsServiceClient(cc) - } else { - c.metricsClient = nil +// Start establishes a gRPC connection to the collector. +func (c *client) Start(ctx context.Context) error { + if c.conn == nil { + // If the caller did not provide a ClientConn when the client was + // created, create one using the configuration they did provide. + conn, err := grpc.DialContext(ctx, c.endpoint, c.dialOpts...) + if err != nil { + return err + } + // Keep track that we own the lifecycle of this conn and need to close + // it on Shutdown. + c.ourConn = true + c.conn = conn } -} -// Start establishes a connection to the collector. -func (c *client) Start(ctx context.Context) error { - return c.connection.StartConnection(ctx) + // The otlpmetric.Client interface states this method is called just once, + // so no need to check if already started. + c.mscMu.Lock() + c.msc = colmetricpb.NewMetricsServiceClient(c.conn) + c.mscMu.Unlock() + + return nil } -// Stop shuts down the connection to the collector. +var errAlreadyStopped = errors.New("the client is already stopped") + +// Stop shuts down the client. +// +// Any active connections to a remote endpoint are closed if they were created +// by the client. Any gRPC connection passed during creation using +// WithGRPCConn will not be closed. It is the caller's responsibility to +// handle cleanup of that resource. +// +// This method synchronizes with the UploadMetrics method of the client. It +// will wait for any active calls to that method to complete unimpeded, or it +// will cancel any active calls if ctx expires. If ctx expires, the context +// error will be forwarded as the returned error. All client held resources +// will still be released in this situation. +// +// If the client has already stopped, an error will be returned describing +// this. func (c *client) Stop(ctx context.Context) error { - return c.connection.Shutdown(ctx) + // Acquire the c.mscMu lock within the ctx lifetime. + acquired := make(chan struct{}) + go func() { + c.mscMu.Lock() + close(acquired) + }() + var err error + select { + case <-ctx.Done(): + // The Stop timeout is reached. Kill any remaining exports to force + // the clear of the lock and save the timeout error to return and + // signal the shutdown timed out before cleanly stopping. + c.stopFunc() + err = ctx.Err() + + // To ensure the client is not left in a dirty state c.msc needs to be + // set to nil. To avoid the race condition when doing this, ensure + // that all the exports are killed (initiated by c.stopFunc). + <-acquired + case <-acquired: + } + // Hold the mscMu lock for the rest of the function to ensure no new + // exports are started. + defer c.mscMu.Unlock() + + // The otlpmetric.Client interface states this method is called only + // once, but there is no guarantee it is called after Start. Ensure the + // client is started before doing anything and let the called know if they + // made a mistake. + if c.msc == nil { + return errAlreadyStopped + } + + // Clear c.msc to signal the client is stopped. + c.msc = nil + + if c.ourConn { + closeErr := c.conn.Close() + // A context timeout error takes precedence over this error. + if err == nil && closeErr != nil { + err = closeErr + } + } + return err } -// UploadMetrics sends a batch of metrics to the collector. +var errShutdown = errors.New("the client is shutdown") + +// UploadMetrics sends a batch of spans. +// +// Retryable errors from the server will be handled according to any +// RetryConfig the client was created with. func (c *client) UploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error { - if !c.connection.Connected() { - return fmt.Errorf("metrics exporter is disconnected from the server %s: %w", c.connection.SCfg.Endpoint, c.connection.LastConnectError()) + // Hold a read lock to ensure a shut down initiated after this starts does + // not abandon the export. This read lock acquire has less priority than a + // write lock acquire (i.e. Stop), meaning if the client is shutting down + // this will come after the shut down. + c.mscMu.RLock() + defer c.mscMu.RUnlock() + + if c.msc == nil { + return errShutdown } - ctx, cancel := c.connection.ContextWithStop(ctx) + ctx, cancel := c.exportContext(ctx) defer cancel() - ctx, tCancel := context.WithTimeout(ctx, c.connection.SCfg.Timeout) - defer tCancel() - - ctx = c.connection.ContextWithMetadata(ctx) - err := func() error { - c.lock.Lock() - defer c.lock.Unlock() - if c.metricsClient == nil { - return errNoClient - } - return c.connection.DoRequest(ctx, func(ctx context.Context) error { - _, err := c.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{ - ResourceMetrics: protoMetrics, - }) - return err + return c.requestFunc(ctx, func(iCtx context.Context) error { + _, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{ + ResourceMetrics: protoMetrics, }) + // nil is converted to OK. + if status.Code(err) == codes.OK { + // Success. + return nil + } + return err + }) +} + +// exportContext returns a copy of parent with an appropriate deadline and +// cancellation function. +// +// It is the callers responsibility to cancel the returned context once its +// use is complete, via the parent or directly with the returned CancelFunc, to +// ensure all resources are correctly released. +func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) { + var ( + ctx context.Context + cancel context.CancelFunc + ) + + if c.exportTimeout > 0 { + ctx, cancel = context.WithTimeout(parent, c.exportTimeout) + } else { + ctx, cancel = context.WithCancel(parent) + } + + if c.metadata.Len() > 0 { + ctx = metadata.NewOutgoingContext(ctx, c.metadata) + } + + // Unify the client stopCtx with the parent. + go func() { + select { + case <-ctx.Done(): + case <-c.stopCtx.Done(): + // Cancel the export as the shutdown has timed out. + cancel() + } }() - if err != nil { - c.connection.SetStateDisconnected(err) + + return ctx, cancel +} + +// retryable returns if err identifies a request that can be retried and a +// duration to wait for if an explicit throttle time is included in err. +func retryable(err error) (bool, time.Duration) { + //func retryable(err error) (bool, time.Duration) { + s := status.Convert(err) + switch s.Code() { + case codes.Canceled, + codes.DeadlineExceeded, + codes.ResourceExhausted, + codes.Aborted, + codes.OutOfRange, + codes.Unavailable, + codes.DataLoss: + return true, throttleDelay(s) } - return err + + // Not a retry-able error. + return false, 0 +} + +// throttleDelay returns a duration to wait for if an explicit throttle time +// is included in the response status. +func throttleDelay(status *status.Status) time.Duration { + for _, detail := range status.Details() { + if t, ok := detail.(*errdetails.RetryInfo); ok { + return t.RetryDelay.AsDuration() + } + } + return 0 } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go index bf55eccfb8b..dbd8f47998b 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go @@ -24,12 +24,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest" @@ -165,386 +163,6 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { } } -func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) { - // TODO: Fix this test #1527 - t.Skip("This test is flaky and needs to be rewritten") - mc := runMockCollector(t) - - reconnectionPeriod := 20 * time.Millisecond - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetrySettings{Enabled: false}), - otlpmetricgrpc.WithReconnectionPeriod(reconnectionPeriod)) - defer func() { require.NoError(t, exp.Shutdown(ctx)) }() - - // Wait for a connection. - mc.ln.WaitForConn() - - // We'll now stop the collector right away to simulate a connection - // dying in the midst of communication or even not existing before. - require.NoError(t, mc.stop()) - - // first export, it will send disconnected message to the channel on export failure, - // trigger almost immediate reconnection - require.Error(t, exp.Export(ctx, testResource, oneRecord)) - - // second export, it will detect connection issue, change state of exporter to disconnected and - // send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel) - require.Error(t, exp.Export(ctx, testResource, oneRecord)) - - // as a result we have exporter in disconnected state waiting for disconnection message to reconnect - - // resurrect collector - nmc := runMockCollectorAtEndpoint(t, mc.endpoint) - - // make sure reconnection loop hits beginning and goes back to waiting mode - // after hitting beginning of the loop it should reconnect - nmc.ln.WaitForConn() - - n := 10 - for i := 0; i < n; i++ { - // when disconnected exp.Export doesnt send disconnected messages again - // it just quits and return last connection error - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - } - - nmaMetrics := nmc.getMetrics() - - if g, w := len(nmaMetrics), n; g != w { - t.Fatalf("Connected collector: metrics: got %d want %d", g, w) - } - - dMetrics := mc.getMetrics() - // Expecting 0 metrics to have been received by the original but now dead collector - if g, w := len(dMetrics), 0; g != w { - t.Fatalf("Disconnected collector: spans: got %d want %d", g, w) - } - - require.NoError(t, nmc.Stop()) -} - -func TestExporterExportFailureAndRecoveryModes(t *testing.T) { - tts := []struct { - name string - errors []error - rs otlpmetricgrpc.RetrySettings - fn func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) - opts []otlpmetricgrpc.Option - }{ - { - name: "Do not retry if succeeded", - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 1) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 success request.") - }, - }, - { - name: "Do not retry if 'error' is ok", - errors: []error{ - status.Error(codes.OK, ""), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 error OK request.") - }, - }, - { - name: "Fail three times and succeed", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: 300 * time.Millisecond, - InitialInterval: 2 * time.Millisecond, - MaxInterval: 10 * time.Millisecond, - }, - errors: []error{ - status.Error(codes.Unavailable, "backend under pressure"), - status.Error(codes.Unavailable, "backend under pressure"), - status.Error(codes.Unavailable, "backend under pressure"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 1) - require.Equal(t, 4, mc.metricSvc.requests, "metric service must receive 3 failure requests and 1 success request.") - }, - }, - { - name: "Permanent error should not be retried", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: 300 * time.Millisecond, - InitialInterval: 2 * time.Millisecond, - MaxInterval: 10 * time.Millisecond, - }, - errors: []error{ - status.Error(codes.InvalidArgument, "invalid arguments"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.Error(t, exp.Export(ctx, testResource, oneRecord)) - - metric := mc.getMetrics() - - require.Len(t, metric, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 error requests.") - }, - }, - { - name: "Test all transient errors and succeed", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: 500 * time.Millisecond, - InitialInterval: 1 * time.Millisecond, - MaxInterval: 2 * time.Millisecond, - }, - errors: []error{ - status.Error(codes.Canceled, ""), - status.Error(codes.DeadlineExceeded, ""), - status.Error(codes.ResourceExhausted, ""), - status.Error(codes.Aborted, ""), - status.Error(codes.OutOfRange, ""), - status.Error(codes.Unavailable, ""), - status.Error(codes.DataLoss, ""), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 1) - require.Equal(t, 8, mc.metricSvc.requests, "metric service must receive 9 failure requests and 1 success request.") - }, - }, - { - name: "Retry should honor server throttling", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: time.Minute, - InitialInterval: time.Nanosecond, - MaxInterval: time.Nanosecond, - }, - opts: []otlpmetricgrpc.Option{ - otlpmetricgrpc.WithTimeout(time.Millisecond * 100), - }, - errors: []error{ - newThrottlingError(codes.ResourceExhausted, time.Second*30), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - require.Equal(t, "context deadline exceeded", err.Error()) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 failure requests and 1 success request.") - }, - }, - { - name: "Retry should fail if server throttling is higher than the MaxElapsedTime", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: time.Millisecond * 100, - InitialInterval: time.Nanosecond, - MaxInterval: time.Nanosecond, - }, - errors: []error{ - newThrottlingError(codes.ResourceExhausted, time.Minute), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - require.Equal(t, "max elapsed time expired when respecting server throttle: rpc error: code = ResourceExhausted desc = ", err.Error()) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 failure requests and 1 success request.") - }, - }, - { - name: "Retry stops if takes too long", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: time.Millisecond * 100, - InitialInterval: time.Millisecond * 50, - MaxInterval: time.Millisecond * 50, - }, - errors: []error{ - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - - require.Equal(t, "max elapsed time expired: rpc error: code = Unavailable desc = unavailable", err.Error()) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.LessOrEqual(t, 1, mc.metricSvc.requests, "metric service must receive at least 1 failure requests.") - }, - }, - { - name: "Disabled retry", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: false, - }, - errors: []error{ - status.Error(codes.Unavailable, "unavailable"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - - require.Equal(t, "rpc error: code = Unavailable desc = unavailable", err.Error()) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 failure requests.") - }, - }, - } - - for _, tt := range tts { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - - mc := runMockCollectorWithConfig(t, &mockConfig{ - errors: tt.errors, - }) - - opts := []otlpmetricgrpc.Option{ - otlpmetricgrpc.WithRetry(tt.rs), - } - - if len(tt.opts) != 0 { - opts = append(opts, tt.opts...) - } - - exp := newGRPCExporter(t, ctx, mc.endpoint, opts...) - - tt.fn(t, ctx, exp, mc) - - require.NoError(t, mc.Stop()) - require.NoError(t, exp.Shutdown(ctx)) - }) - } - -} - -func TestPermanentErrorsShouldNotBeRetried(t *testing.T) { - permanentErrors := []*status.Status{ - status.New(codes.Unknown, "Unknown"), - status.New(codes.InvalidArgument, "InvalidArgument"), - status.New(codes.NotFound, "NotFound"), - status.New(codes.AlreadyExists, "AlreadyExists"), - status.New(codes.FailedPrecondition, "FailedPrecondition"), - status.New(codes.Unimplemented, "Unimplemented"), - status.New(codes.Internal, "Internal"), - status.New(codes.PermissionDenied, ""), - status.New(codes.Unauthenticated, ""), - } - - for _, sts := range permanentErrors { - t.Run(sts.Code().String(), func(t *testing.T) { - ctx := context.Background() - - mc := runMockCollectorWithConfig(t, &mockConfig{ - errors: []error{sts.Err()}, - }) - - exp := newGRPCExporter(t, ctx, mc.endpoint) - - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - require.Len(t, mc.getMetrics(), 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 permanent error requests.") - - require.NoError(t, mc.Stop()) - require.NoError(t, exp.Shutdown(ctx)) - }) - } -} - -func newThrottlingError(code codes.Code, duration time.Duration) error { - s := status.New(code, "") - - s, _ = s.WithDetails(&errdetails.RetryInfo{RetryDelay: durationpb.New(duration)}) - - return s.Err() -} - -func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { - // TODO: Fix this test #1527 - t.Skip("This test is flaky and needs to be rewritten") - mc := runMockCollector(t) - - reconnectionPeriod := 50 * time.Millisecond - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetrySettings{Enabled: false}), - otlpmetricgrpc.WithReconnectionPeriod(reconnectionPeriod)) - defer func() { require.NoError(t, exp.Shutdown(ctx)) }() - - mc.ln.WaitForConn() - - // We'll now stop the collector right away to simulate a connection - // dying in the midst of communication or even not existing before. - require.NoError(t, mc.stop()) - - // In the test below, we'll stop the collector many times, - // while exporting metrics and test to ensure that we can - // reconnect. - for j := 0; j < 3; j++ { - - // No endpoint up. - require.Error(t, exp.Export(ctx, testResource, oneRecord)) - - // Now resurrect the collector by making a new one but reusing the - // old endpoint, and the collector should reconnect automatically. - nmc := runMockCollectorAtEndpoint(t, mc.endpoint) - - // Give the exporter sometime to reconnect - nmc.ln.WaitForConn() - - n := 10 - for i := 0; i < n; i++ { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - } - - nmaMetrics := nmc.getMetrics() - // Expecting 10 metrics that were sampled, given that - if g, w := len(nmaMetrics), n; g != w { - t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w) - } - - dMetrics := mc.getMetrics() - // Expecting 0 metrics to have been received by the original but now dead collector - if g, w := len(dMetrics), 0; g != w { - t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w) - } - - // Disconnect for the next try. - require.NoError(t, nmc.stop()) - } -} - // This test takes a long time to run: to skip it, run tests using: -short func TestNewExporter_collectorOnBadConnection(t *testing.T) { if testing.Short() { @@ -641,7 +259,7 @@ func TestNewExporter_WithTimeout(t *testing.T) { }() ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, otlpmetricgrpc.WithTimeout(tt.timeout), otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetrySettings{Enabled: false})) + exp := newGRPCExporter(t, ctx, mc.endpoint, otlpmetricgrpc.WithTimeout(tt.timeout), otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{Enabled: false})) defer func() { _ = exp.Shutdown(ctx) }() @@ -662,48 +280,31 @@ func TestNewExporter_WithTimeout(t *testing.T) { } } -func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) { +func TestStartErrorInvalidSecurityConfiguration(t *testing.T) { mc := runMockCollector(t) defer func() { _ = mc.stop() }() - ctx := context.Background() client := otlpmetricgrpc.NewClient(otlpmetricgrpc.WithEndpoint(mc.endpoint)) - exp, err := otlpmetric.New(ctx, client) - if err != nil { - t.Fatalf("failed to create a new collector exporter: %v", err) - } - - err = exp.Export(ctx, testResource, oneRecord) - - expectedErr := fmt.Sprintf("metrics exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint) - - require.Error(t, err) - require.Equal(t, expectedErr, err.Error()) - - defer func() { - _ = exp.Shutdown(ctx) - }() + err := client.Start(context.Background()) + // https://github.com/grpc/grpc-go/blob/a671967dfbaab779d37fd7e597d9248f13806087/clientconn.go#L82 + assert.EqualError(t, err, "grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") } -func TestDisconnected(t *testing.T) { - ctx := context.Background() - // The endpoint is whatever, we want to be disconnected. But we - // setting a blocking connection, so dialing to the invalid - // endpoint actually fails. - exp := newGRPCExporter(t, ctx, "invalid", - otlpmetricgrpc.WithReconnectionPeriod(time.Hour), +func TestStartErrorInvalidAddress(t *testing.T) { + client := otlpmetricgrpc.NewClient( + otlpmetricgrpc.WithInsecure(), + // Validate the connection in Start (which should return the error). otlpmetricgrpc.WithDialOption( grpc.WithBlock(), grpc.FailOnNonTempDialError(true), ), + otlpmetricgrpc.WithEndpoint("invalid"), + otlpmetricgrpc.WithReconnectionPeriod(time.Hour), ) - defer func() { - assert.NoError(t, exp.Shutdown(ctx)) - }() - - assert.Error(t, exp.Export(ctx, testResource, oneRecord)) + err := client.Start(context.Background()) + assert.EqualError(t, err, `connection error: desc = "transport: error while dialing: dial tcp: address invalid: missing port in address"`) } func TestEmptyData(t *testing.T) { diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_unit_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_unit_test.go new file mode 100644 index 00000000000..ccd4ade1389 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_unit_test.go @@ -0,0 +1,193 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpmetricgrpc + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" +) + +func TestThrottleDuration(t *testing.T) { + c := codes.ResourceExhausted + testcases := []struct { + status *status.Status + expected time.Duration + }{ + { + status: status.New(c, "no retry info"), + expected: 0, + }, + { + status: func() *status.Status { + s, err := status.New(c, "single retry info").WithDetails( + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(15 * time.Millisecond), + }, + ) + require.NoError(t, err) + return s + }(), + expected: 15 * time.Millisecond, + }, + { + status: func() *status.Status { + s, err := status.New(c, "error info").WithDetails( + &errdetails.ErrorInfo{Reason: "no throttle detail"}, + ) + require.NoError(t, err) + return s + }(), + expected: 0, + }, + { + status: func() *status.Status { + s, err := status.New(c, "error and retry info").WithDetails( + &errdetails.ErrorInfo{Reason: "with throttle detail"}, + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(13 * time.Minute), + }, + ) + require.NoError(t, err) + return s + }(), + expected: 13 * time.Minute, + }, + { + status: func() *status.Status { + s, err := status.New(c, "double retry info").WithDetails( + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(13 * time.Minute), + }, + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(15 * time.Minute), + }, + ) + require.NoError(t, err) + return s + }(), + expected: 13 * time.Minute, + }, + } + + for _, tc := range testcases { + t.Run(tc.status.Message(), func(t *testing.T) { + require.Equal(t, tc.expected, throttleDelay(tc.status)) + }) + } +} + +func TestRetryable(t *testing.T) { + retryableCodes := map[codes.Code]bool{ + codes.OK: false, + codes.Canceled: true, + codes.Unknown: false, + codes.InvalidArgument: false, + codes.DeadlineExceeded: true, + codes.NotFound: false, + codes.AlreadyExists: false, + codes.PermissionDenied: false, + codes.ResourceExhausted: true, + codes.FailedPrecondition: false, + codes.Aborted: true, + codes.OutOfRange: true, + codes.Unimplemented: false, + codes.Internal: false, + codes.Unavailable: true, + codes.DataLoss: true, + codes.Unauthenticated: false, + } + + for c, want := range retryableCodes { + got, _ := retryable(status.Error(c, "")) + assert.Equalf(t, want, got, "evaluate(%s)", c) + } +} + +func TestUnstartedStop(t *testing.T) { + client := NewClient() + assert.ErrorIs(t, client.Stop(context.Background()), errAlreadyStopped) +} + +func TestUnstartedUploadMetric(t *testing.T) { + client := NewClient() + assert.ErrorIs(t, client.UploadMetrics(context.Background(), nil), errShutdown) +} + +func TestExportContextHonorsParentDeadline(t *testing.T) { + now := time.Now() + ctx, cancel := context.WithDeadline(context.Background(), now) + t.Cleanup(cancel) + + // Without a client timeout, the parent deadline should be used. + client := newClient(WithTimeout(0)) + eCtx, eCancel := client.exportContext(ctx) + t.Cleanup(eCancel) + + deadline, ok := eCtx.Deadline() + assert.True(t, ok, "deadline not propagated to child context") + assert.Equal(t, now, deadline) +} + +func TestExportContextHonorsClientTimeout(t *testing.T) { + // Setting a timeout should ensure a deadline is set on the context. + client := newClient(WithTimeout(1 * time.Second)) + ctx, cancel := client.exportContext(context.Background()) + t.Cleanup(cancel) + + _, ok := ctx.Deadline() + assert.True(t, ok, "timeout not set as deadline for child context") +} + +func TestExportContextLinksStopSignal(t *testing.T) { + rootCtx := context.Background() + + client := newClient(WithInsecure()) + t.Cleanup(func() { require.NoError(t, client.Stop(rootCtx)) }) + require.NoError(t, client.Start(rootCtx)) + + ctx, cancel := client.exportContext(rootCtx) + t.Cleanup(cancel) + + require.False(t, func() bool { + select { + case <-ctx.Done(): + return true + default: + } + return false + }(), "context should not be done prior to canceling it") + + // The client.stopFunc cancels the client.stopCtx. This should have been + // setup as a parent of ctx. Therefore, it should cancel ctx as well. + client.stopFunc() + + // Assert this with Eventually to account for goroutine scheduler timing. + assert.Eventually(t, func() bool { + select { + case <-ctx.Done(): + return true + default: + } + return false + }, 10*time.Second, time.Microsecond) +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/mock_collector_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/mock_collector_test.go index ea28c1a7fad..3eecfef39c0 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/mock_collector_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/mock_collector_test.go @@ -18,8 +18,6 @@ import ( "context" "fmt" "net" - "runtime" - "strings" "sync" "testing" "time" @@ -94,7 +92,6 @@ type mockCollector struct { metricSvc *mockMetricService endpoint string - ln *listener stopFunc func() stopOnce sync.Once } @@ -160,9 +157,8 @@ func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockColle srv := grpc.NewServer() mc := makeMockCollector(t, mockConfig) collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc) - mc.ln = newListener(ln) go func() { - _ = srv.Serve((net.Listener)(mc.ln)) + _ = srv.Serve(ln) }() mc.endpoint = ln.Addr().String() @@ -171,59 +167,3 @@ func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockColle return mc } - -type listener struct { - closeOnce sync.Once - wrapped net.Listener - C chan struct{} -} - -func newListener(wrapped net.Listener) *listener { - return &listener{ - wrapped: wrapped, - C: make(chan struct{}, 1), - } -} - -func (l *listener) Close() error { return l.wrapped.Close() } - -func (l *listener) Addr() net.Addr { return l.wrapped.Addr() } - -// Accept waits for and returns the next connection to the listener. It will -// send a signal on l.C that a connection has been made before returning. -func (l *listener) Accept() (net.Conn, error) { - conn, err := l.wrapped.Accept() - if err != nil { - // Go 1.16 exported net.ErrClosed that could clean up this check, but to - // remain backwards compatible with previous versions of Go that we - // support the following string evaluation is used instead to keep in line - // with the previously recommended way to check this: - // https://github.com/golang/go/issues/4373#issuecomment-353076799 - if strings.Contains(err.Error(), "use of closed network connection") { - // If the listener has been closed, do not allow callers of - // WaitForConn to wait for a connection that will never come. - l.closeOnce.Do(func() { close(l.C) }) - } - return conn, err - } - - select { - case l.C <- struct{}{}: - default: - // If C is full, assume nobody is listening and move on. - } - return conn, nil -} - -// WaitForConn will wait indefintely for a connection to be estabilished with -// the listener before returning. -func (l *listener) WaitForConn() { - for { - select { - case <-l.C: - return - default: - runtime.Gosched() - } - } -} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/options.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/options.go index 1145fdc9f5e..685e08aeb67 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/options.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/options.go @@ -23,16 +23,28 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" ) -// Option applies an option to the gRPC client. +// Option applies an option to the gRPC driver. type Option interface { applyGRPCOption(*otlpconfig.Config) } -// RetrySettings defines configuration for retrying batches in case of export failure -// using an exponential backoff. -type RetrySettings otlpconfig.RetrySettings +func asGRPCOptions(opts []Option) []otlpconfig.GRPCOption { + converted := make([]otlpconfig.GRPCOption, len(opts)) + for i, o := range opts { + converted[i] = otlpconfig.NewGRPCOption(o.applyGRPCOption) + } + return converted +} + +// RetryConfig defines configuration for retrying export of span batches that +// failed to be received by the target endpoint. +// +// This configuration does not define any network retry strategy. That is +// entirely handled by the gRPC ClientConn. +type RetryConfig retry.Config type wrappedOption struct { otlpconfig.GRPCOption @@ -42,22 +54,28 @@ func (w wrappedOption) applyGRPCOption(cfg *otlpconfig.Config) { w.ApplyGRPCOption(cfg) } -// WithInsecure disables client transport security for the exporter's gRPC connection -// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure -// does. Note, by default, client security is required unless WithInsecure is used. +// WithInsecure disables client transport security for the exporter's gRPC +// connection just like grpc.WithInsecure() +// (https://pkg.go.dev/google.golang.org/grpc#WithInsecure) does. Note, by +// default, client security is required unless WithInsecure is used. +// +// This option has no effect if WithGRPCConn is used. func WithInsecure() Option { return wrappedOption{otlpconfig.WithInsecure()} } -// WithEndpoint allows one to set the endpoint that the exporter will -// connect to the collector on. If unset, it will instead try to use -// connect to DefaultCollectorHost:DefaultCollectorPort. +// WithEndpoint sets the target endpoint the exporter will connect to. If +// unset, localhost:4317 will be used as a default. +// +// This option has no effect if WithGRPCConn is used. func WithEndpoint(endpoint string) Option { return wrappedOption{otlpconfig.WithEndpoint(endpoint)} } -// WithReconnectionPeriod allows one to set the delay between next connection attempt -// after failing to connect with the collector. +// WithReconnectionPeriod set the minimum amount of time between connection +// attempts to the target endpoint. +// +// This option has no effect if WithGRPCConn is used. func WithReconnectionPeriod(rp time.Duration) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.ReconnectionPeriod = rp @@ -74,25 +92,30 @@ func compressorToCompression(compressor string) otlpconfig.Compression { return otlpconfig.NoCompression } -// WithCompressor will set the compressor for the gRPC client to use when sending requests. -// It is the responsibility of the caller to ensure that the compressor set has been registered -// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some -// compressors auto-register on import, such as gzip, which can be registered by calling +// WithCompressor sets the compressor for the gRPC client to use when sending +// requests. It is the responsibility of the caller to ensure that the +// compressor set has been registered with google.golang.org/grpc/encoding. +// This can be done by encoding.RegisterCompressor. Some compressors +// auto-register on import, such as gzip, which can be registered by calling // `import _ "google.golang.org/grpc/encoding/gzip"`. +// +// This option has no effect if WithGRPCConn is used. func WithCompressor(compressor string) Option { return wrappedOption{otlpconfig.WithCompression(compressorToCompression(compressor))} } -// WithHeaders will send the provided headers with gRPC requests. +// WithHeaders will send the provided headers with each gRPC requests. func WithHeaders(headers map[string]string) Option { return wrappedOption{otlpconfig.WithHeaders(headers)} } -// WithTLSCredentials allows the connection to use TLS credentials -// when talking to the server. It takes in grpc.TransportCredentials instead -// of say a Certificate file or a tls.Certificate, because the retrieving of -// these credentials can be done in many ways e.g. plain file, in code tls.Config -// or by certificate rotation, so it is up to the caller to decide what to use. +// WithTLSCredentials allows the connection to use TLS credentials when +// talking to the server. It takes in grpc.TransportCredentials instead of say +// a Certificate file or a tls.Certificate, because the retrieving of these +// credentials can be done in many ways e.g. plain file, in code tls.Config or +// by certificate rotation, so it is up to the caller to decide what to use. +// +// This option has no effect if WithGRPCConn is used. func WithTLSCredentials(creds credentials.TransportCredentials) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.Metrics.GRPCCredentials = creds @@ -100,40 +123,63 @@ func WithTLSCredentials(creds credentials.TransportCredentials) Option { } // WithServiceConfig defines the default gRPC service config used. +// +// This option has no effect if WithGRPCConn is used. func WithServiceConfig(serviceConfig string) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.ServiceConfig = serviceConfig })} } -// WithDialOption opens support to any grpc.DialOption to be used. If it conflicts -// with some other configuration the GRPC specified via the collector the ones here will -// take preference since they are set last. +// WithDialOption sets explicit grpc.DialOptions to use when making a +// connection. The options here are appended to the internal grpc.DialOptions +// used so they will take precedence over any other internal grpc.DialOptions +// they might conflict with. +// +// This option has no effect if WithGRPCConn is used. func WithDialOption(opts ...grpc.DialOption) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.DialOptions = opts })} } -// WithGRPCConn allows reusing existing gRPC connection when it has already been -// established for other services. When set, other dial options will be ignored. +// WithGRPCConn sets conn as the gRPC ClientConn used for all communication. +// +// This option takes precedence over any other option that relates to +// establishing or persisting a gRPC connection to a target endpoint. Any +// other option of those types passed will be ignored. +// +// It is the callers responsibility to close the passed conn. The client +// Shutdown method will not close this connection. func WithGRPCConn(conn *grpc.ClientConn) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.GRPCConn = conn })} } -// WithTimeout tells the client the max waiting time for the backend to process -// each metrics batch. If unset, the default will be 10 seconds. +// WithTimeout sets the max amount of time a client will attempt to export a +// batch of spans. This takes precedence over any retry settings defined with +// WithRetry, once this time limit has been reached the export is abandoned +// and the batch of spans is dropped. +// +// If unset, the default timeout will be set to 10 seconds. func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } -// WithRetry configures the retry policy for transient errors that may occurs when -// exporting metrics. An exponential back-off algorithm is used to -// ensure endpoints are not overwhelmed with retries. If unset, the default -// retry policy will retry after 5 seconds and increase exponentially after each -// error for a total of 1 minute. -func WithRetry(settings RetrySettings) Option { - return wrappedOption{otlpconfig.WithRetry(otlpconfig.RetrySettings(settings))} +// WithRetry sets the retry policy for transient retryable errors that may be +// returned by the target endpoint when exporting a batch of spans. +// +// If the target endpoint responds with not only a retryable error, but +// explicitly returns a backoff time in the response. That time will take +// precedence over these settings. +// +// These settings do not define any network retry strategy. That is entirely +// handled by the gRPC ClientConn. +// +// If unset, the default retry policy will be used. It will retry the export +// 5 seconds after receiving a retryable error and increase exponentially +// after each error for no more than a total time of 1 minute. +func WithRetry(settings RetryConfig) Option { + return wrappedOption{otlpconfig.WithRetry(retry.Config(settings))} } diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index e4e5c67e6f9..b6b135c142a 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -21,24 +21,32 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "net" "net/http" "path" + "strconv" "strings" + "sync" "time" "google.golang.org/protobuf/proto" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) const contentTypeProto = "application/x-protobuf" +var gzPool = sync.Pool{ + New: func() interface{} { + w := gzip.NewWriter(ioutil.Discard) + return w + }, +} + // Keep it in sync with golang's DefaultTransport from net/http! We // have our own copy to avoid handling a situation where the // DefaultTransport is overwritten with some different implementation @@ -57,11 +65,13 @@ var ourTransport = &http.Transport{ } type client struct { - name string - cfg otlpconfig.SignalConfig - generalCfg otlpconfig.Config - client *http.Client - stopCh chan struct{} + name string + cfg otlpconfig.SignalConfig + generalCfg otlpconfig.Config + requestFunc retry.RequestFunc + client *http.Client + stopCh chan struct{} + stopOnce sync.Once } // NewClient creates a new HTTP metric client. @@ -73,7 +83,7 @@ func NewClient(opts ...Option) otlpmetric.Client { } for pathPtr, defaultPath := range map[*string]string{ - &cfg.Metrics.URLPath: defaultMetricsPath, + &cfg.Metrics.URLPath: otlpconfig.DefaultMetricsPath, } { tmp := strings.TrimSpace(*pathPtr) if tmp == "" { @@ -86,15 +96,6 @@ func NewClient(opts ...Option) otlpmetric.Client { } *pathPtr = tmp } - if cfg.MaxAttempts <= 0 { - cfg.MaxAttempts = defaultMaxAttempts - } - if cfg.MaxAttempts > defaultMaxAttempts { - cfg.MaxAttempts = defaultMaxAttempts - } - if cfg.Backoff <= 0 { - cfg.Backoff = defaultBackoff - } httpClient := &http.Client{ Transport: ourTransport, @@ -108,11 +109,12 @@ func NewClient(opts ...Option) otlpmetric.Client { stopCh := make(chan struct{}) return &client{ - name: "metrics", - cfg: cfg.Metrics, - generalCfg: cfg, - stopCh: stopCh, - client: httpClient, + name: "metrics", + cfg: cfg.Metrics, + generalCfg: cfg, + requestFunc: cfg.RetryConfig.RequestFunc(evaluate), + stopCh: stopCh, + client: httpClient, } } @@ -129,7 +131,9 @@ func (d *client) Start(ctx context.Context) error { // Stop shuts down the client and interrupt any in-flight request. func (d *client) Stop(ctx context.Context) error { - close(d.stopCh) + d.stopOnce.Do(func() { + close(d.stopCh) + }) select { case <-ctx.Done(): return ctx.Err() @@ -147,41 +151,150 @@ func (d *client) UploadMetrics(ctx context.Context, protoMetrics []*metricpb.Res if err != nil { return err } - return d.send(ctx, rawRequest) -} -func (d *client) send(ctx context.Context, rawRequest []byte) error { - address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath) - var cancel context.CancelFunc - ctx, cancel = d.contextWithStop(ctx) + ctx, cancel := d.contextWithStop(ctx) defer cancel() - for i := 0; i < d.generalCfg.MaxAttempts; i++ { - response, err := d.singleSend(ctx, rawRequest, address) + + request, err := d.newRequest(rawRequest) + if err != nil { + return err + } + + return d.requestFunc(ctx, func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + request.reset(ctx) + resp, err := d.client.Do(request.Request) if err != nil { return err } - // We don't care about the body, so try to read it - // into /dev/null and close it immediately. The - // reading part is to facilitate connection reuse. - _, _ = io.Copy(ioutil.Discard, response.Body) - _ = response.Body.Close() - switch response.StatusCode { + + var rErr error + switch resp.StatusCode { case http.StatusOK: - return nil - case http.StatusTooManyRequests: - fallthrough - case http.StatusServiceUnavailable: - select { - case <-time.After(getWaitDuration(d.generalCfg.Backoff, i)): - continue - case <-ctx.Done(): - return ctx.Err() + // Success, do not retry. + case http.StatusTooManyRequests, + http.StatusServiceUnavailable: + // Retry-able failure. + rErr = newResponseError(resp.Header) + + // Going to retry, drain the body to reuse the connection. + if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil { + _ = resp.Body.Close() + return err } default: - return fmt.Errorf("failed to send %s to %s with HTTP status %s", d.name, address, response.Status) + rErr = fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status) + } + + if err := resp.Body.Close(); err != nil { + return err + } + return rErr + }) +} + +func (d *client) newRequest(body []byte) (request, error) { + address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath) + r, err := http.NewRequest(http.MethodPost, address, nil) + if err != nil { + return request{Request: r}, err + } + + for k, v := range d.cfg.Headers { + r.Header.Set(k, v) + } + r.Header.Set("Content-Type", contentTypeProto) + + req := request{Request: r} + switch Compression(d.cfg.Compression) { + case NoCompression: + r.ContentLength = (int64)(len(body)) + req.bodyReader = bodyReader(body) + case GzipCompression: + // Ensure the content length is not used. + r.ContentLength = -1 + r.Header.Set("Content-Encoding", "gzip") + + gz := gzPool.Get().(*gzip.Writer) + defer gzPool.Put(gz) + + var b bytes.Buffer + gz.Reset(&b) + + if _, err := gz.Write(body); err != nil { + return req, err + } + // Close needs to be called to ensure body if fully written. + if err := gz.Close(); err != nil { + return req, err + } + + req.bodyReader = bodyReader(b.Bytes()) + } + + return req, nil +} + +// bodyReader returns a closure returning a new reader for buf. +func bodyReader(buf []byte) func() io.ReadCloser { + return func() io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader(buf)) + } +} + +// request wraps an http.Request with a resettable body reader. +type request struct { + *http.Request + + // bodyReader allows the same body to be used for multiple requests. + bodyReader func() io.ReadCloser +} + +// reset reinitializes the request Body and uses ctx for the request. +func (r *request) reset(ctx context.Context) { + r.Body = r.bodyReader() + r.Request = r.Request.WithContext(ctx) +} + +// retryableError represents a request failure that can be retried. +type retryableError struct { + throttle int64 +} + +// newResponseError returns a retryableError and will extract any explicit +// throttle delay contained in headers. +func newResponseError(header http.Header) error { + var rErr retryableError + if s, ok := header["Retry-After"]; ok { + if t, err := strconv.ParseInt(s[0], 10, 64); err == nil { + rErr.throttle = t } } - return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.MaxAttempts) + return rErr +} + +func (e retryableError) Error() string { + return "retry-able request failure" +} + +// evaluate returns if err is retry-able. If it is and it includes an explicit +// throttling delay, that delay is also returned. +func evaluate(err error) (bool, time.Duration) { + if err == nil { + return false, 0 + } + + rErr, ok := err.(retryableError) + if !ok { + return false, 0 + } + + return true, time.Duration(rErr.throttle) } func (d *client) getScheme() string { @@ -191,26 +304,6 @@ func (d *client) getScheme() string { return "https" } -func getWaitDuration(backoff time.Duration, i int) time.Duration { - // Strategy: after nth failed attempt, attempt resending after - // k * initialBackoff + jitter, where k is a random number in - // range [0, 2^n-1), and jitter is a random percentage of - // initialBackoff from [-5%, 5%). - // - // Based on - // https://en.wikipedia.org/wiki/Exponential_backoff#Example_exponential_backoff_algorithm - // - // Jitter is our addition. - - // There won't be an overflow, since i is capped to - // defaultMaxAttempts (5). - upperK := (int64)(1) << (i + 1) - jitterPercent := (rand.Float64() - 0.5) / 10. - jitter := jitterPercent * (float64)(backoff) - k := rand.Int63n(upperK) - return (time.Duration)(k)*backoff + (time.Duration)(jitter) -} - func (d *client) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { // Unify the parent context Done signal with the client's stop // channel. @@ -226,51 +319,3 @@ func (d *client) contextWithStop(ctx context.Context) (context.Context, context. }(ctx, cancel) return ctx, cancel } - -func (d *client) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) { - request, err := http.NewRequestWithContext(ctx, http.MethodPost, address, nil) - if err != nil { - return nil, err - } - bodyReader, contentLength, headers := d.prepareBody(rawRequest) - // Not closing bodyReader through defer, the HTTP Client's - // Transport will do it for us - request.Body = bodyReader - request.ContentLength = contentLength - for key, values := range headers { - for _, value := range values { - request.Header.Add(key, value) - } - } - return d.client.Do(request) -} - -func (d *client) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) { - var bodyReader io.ReadCloser - headers := http.Header{} - for k, v := range d.cfg.Headers { - headers.Set(k, v) - } - contentLength := (int64)(len(rawRequest)) - headers.Set("Content-Type", contentTypeProto) - requestReader := bytes.NewBuffer(rawRequest) - switch Compression(d.cfg.Compression) { - case NoCompression: - bodyReader = ioutil.NopCloser(requestReader) - case GzipCompression: - preader, pwriter := io.Pipe() - go func() { - defer pwriter.Close() - gzipper := gzip.NewWriter(pwriter) - defer gzipper.Close() - _, err := io.Copy(gzipper, requestReader) - if err != nil { - otel.Handle(fmt.Errorf("otlphttp: failed to gzip request: %v", err)) - } - }() - headers.Set("Content-Encoding", "gzip") - bodyReader = preader - contentLength = -1 - } - return bodyReader, contentLength, headers -} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 4fb116972e9..e15d8b35d6d 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -16,7 +16,6 @@ package otlpmetrichttp_test import ( "context" - "fmt" "net/http" "os" "testing" @@ -144,32 +143,6 @@ func TestExporterShutdown(t *testing.T) { }) } -func TestRetry(t *testing.T) { - statuses := []int{ - http.StatusTooManyRequests, - http.StatusServiceUnavailable, - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - client := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithMaxAttempts(len(statuses)+1), - ) - ctx := context.Background() - exporter, err := otlpmetric.New(ctx, client) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.NoError(t, err) - assert.Len(t, mc.GetMetrics(), 1) -} - func TestTimeout(t *testing.T) { mcCfg := mockCollectorConfig{ InjectDelay: 100 * time.Millisecond, @@ -191,58 +164,6 @@ func TestTimeout(t *testing.T) { assert.Equal(t, true, os.IsTimeout(err)) } -func TestRetryFailed(t *testing.T) { - statuses := []int{ - http.StatusTooManyRequests, - http.StatusServiceUnavailable, - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithMaxAttempts(1), - ) - ctx := context.Background() - exporter, err := otlpmetric.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.Error(t, err) - assert.Empty(t, mc.GetMetrics()) -} - -func TestNoRetry(t *testing.T) { - statuses := []int{ - http.StatusBadRequest, - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithMaxAttempts(len(statuses)+1), - ) - ctx := context.Background() - exporter, err := otlpmetric.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.Error(t, err) - assert.Equal(t, fmt.Sprintf("failed to send metrics to http://%s/v1/metrics with HTTP status 400 Bad Request", mc.endpoint), err.Error()) - assert.Empty(t, mc.GetMetrics()) -} - func TestEmptyData(t *testing.T) { mcCfg := mockCollectorConfig{} mc := runMockCollector(t, mcCfg) @@ -263,88 +184,6 @@ func TestEmptyData(t *testing.T) { assert.NotEmpty(t, mc.GetMetrics()) } -func TestUnreasonableMaxAttempts(t *testing.T) { - // Max attempts is 5, we set collector to fail 7 times and try - // to configure max attempts to be either negative or too - // large. Since we set max attempts to 5 in such cases, - // exporting to the collector should fail. - type testcase struct { - name string - maxAttempts int - } - for _, tc := range []testcase{ - { - name: "negative max attempts", - maxAttempts: -3, - }, - { - name: "too large max attempts", - maxAttempts: 10, - }, - } { - t.Run(tc.name, func(t *testing.T) { - statuses := make([]int, 0, 7) - for i := 0; i < cap(statuses); i++ { - statuses = append(statuses, http.StatusTooManyRequests) - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithMaxAttempts(tc.maxAttempts), - otlpmetrichttp.WithBackoff(time.Millisecond), - ) - ctx := context.Background() - exporter, err := otlpmetric.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.Error(t, err) - assert.Empty(t, mc.GetMetrics()) - }) - } -} - -func TestUnreasonableBackoff(t *testing.T) { - // This sets backoff to negative value, which gets corrected - // to default backoff instead of being used. Default max - // attempts is 5, so we set the collector to fail 4 times, but - // we set the deadline to 3 times of the default backoff, so - // this should show that deadline is not met, meaning that the - // retries weren't immediate (as negative backoff could - // imply). - statuses := make([]int, 0, 4) - for i := 0; i < cap(statuses); i++ { - statuses = append(statuses, http.StatusTooManyRequests) - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithBackoff(-time.Millisecond), - ) - ctx, cancel := context.WithTimeout(context.Background(), 3*(300*time.Millisecond)) - defer cancel() - exporter, err := otlpmetric.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(context.Background())) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.Error(t, err) - assert.Empty(t, mc.GetMetrics()) -} - func TestCancelledContext(t *testing.T) { statuses := []int{ http.StatusBadRequest, diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_unit_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_unit_test.go new file mode 100644 index 00000000000..4ba01c85e5e --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_unit_test.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpmetrichttp + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUnreasonableBackoff(t *testing.T) { + cIface := NewClient( + WithEndpoint("http://localhost"), + WithInsecure(), + WithBackoff(-time.Microsecond), + ) + require.IsType(t, &client{}, cIface) + c := cIface.(*client) + assert.True(t, c.generalCfg.RetryConfig.Enabled) + assert.Equal(t, 5*time.Second, c.generalCfg.RetryConfig.InitialInterval) + assert.Equal(t, 300*time.Millisecond, c.generalCfg.RetryConfig.MaxInterval) + assert.Equal(t, time.Minute, c.generalCfg.RetryConfig.MaxElapsedTime) +} + +func TestUnreasonableMaxAttempts(t *testing.T) { + type testcase struct { + name string + maxAttempts int + } + for _, tc := range []testcase{ + { + name: "negative max attempts", + maxAttempts: -3, + }, + { + name: "too large max attempts", + maxAttempts: 10, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cIface := NewClient( + WithEndpoint("http://localhost"), + WithInsecure(), + WithMaxAttempts(tc.maxAttempts), + ) + require.IsType(t, &client{}, cIface) + c := cIface.(*client) + assert.True(t, c.generalCfg.RetryConfig.Enabled) + assert.Equal(t, 5*time.Second, c.generalCfg.RetryConfig.InitialInterval) + assert.Equal(t, 30*time.Second, c.generalCfg.RetryConfig.MaxInterval) + assert.Equal(t, 145*time.Second, c.generalCfg.RetryConfig.MaxElapsedTime) + }) + } +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod index 14740d119fe..14a3a27fa18 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod @@ -4,7 +4,6 @@ go 1.16 require ( github.com/stretchr/testify v1.7.0 - go.opentelemetry.io/otel v1.2.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.25.0 go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/proto/otlp v0.11.0 diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum b/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum index b2cd5fd0f24..103f87d6dff 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum @@ -4,6 +4,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/options.go b/exporters/otlp/otlpmetric/otlpmetrichttp/options.go index ccb59415a3c..95ac42fd7f0 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/options.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/options.go @@ -19,19 +19,7 @@ import ( "time" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" -) - -const ( - // defaultMaxAttempts describes how many times the driver - // should retry the sending of the payload in case of a - // retryable error. - defaultMaxAttempts int = 5 - // defaultMetricsPath is a default URL path for endpoint that - // receives metrics. - defaultMetricsPath string = "/v1/metrics" - // defaultBackoff is a default base backoff time used in the - // exponential backoff strategy. - defaultBackoff time.Duration = 300 * time.Millisecond + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" ) // Compression describes the compression used for payloads sent to the @@ -52,6 +40,10 @@ type Option interface { applyHTTPOption(*otlpconfig.Config) } +// RetryConfig defines configuration for retrying batches in case of export +// failure using an exponential backoff. +type RetryConfig retry.Config + type wrappedOption struct { otlpconfig.HTTPOption } @@ -84,15 +76,67 @@ func WithURLPath(urlPath string) Option { // will try to send the payload in case of retryable errors. // The max attempts is limited to at most 5 retries. If unset, // default (5) will be used. +// +// Deprecated: Use WithRetry instead. func WithMaxAttempts(maxAttempts int) Option { - return wrappedOption{otlpconfig.WithMaxAttempts(maxAttempts)} + if maxAttempts > 5 || maxAttempts < 0 { + maxAttempts = 5 + } + return wrappedOption{ + otlpconfig.NewHTTPOption(func(cfg *otlpconfig.Config) { + cfg.RetryConfig.Enabled = true + + var ( + init = cfg.RetryConfig.InitialInterval + maxI = cfg.RetryConfig.MaxInterval + maxE = cfg.RetryConfig.MaxElapsedTime + ) + + if init == 0 { + init = retry.DefaultConfig.InitialInterval + } + if maxI == 0 { + maxI = retry.DefaultConfig.MaxInterval + } + if maxE == 0 { + maxE = retry.DefaultConfig.MaxElapsedTime + } + attempts := int64(maxE+init) / int64(maxI) + + if int64(maxAttempts) == attempts { + return + } + + maxE = time.Duration(int64(maxAttempts)*int64(maxI)) - init + + cfg.RetryConfig.InitialInterval = init + cfg.RetryConfig.MaxInterval = maxI + cfg.RetryConfig.MaxElapsedTime = maxE + }), + } } // WithBackoff tells the driver to use the duration as a base of the // exponential backoff strategy. If unset, default (300ms) will be // used. +// +// Deprecated: Use WithRetry instead. func WithBackoff(duration time.Duration) Option { - return wrappedOption{otlpconfig.WithBackoff(duration)} + if duration < 0 { + duration = 300 * time.Millisecond + } + return wrappedOption{ + otlpconfig.NewHTTPOption(func(cfg *otlpconfig.Config) { + cfg.RetryConfig.Enabled = true + cfg.RetryConfig.MaxInterval = duration + if cfg.RetryConfig.InitialInterval == 0 { + cfg.RetryConfig.InitialInterval = retry.DefaultConfig.InitialInterval + } + if cfg.RetryConfig.MaxElapsedTime == 0 { + cfg.RetryConfig.MaxElapsedTime = retry.DefaultConfig.MaxElapsedTime + } + }), + } } // WithTLSClientConfig can be used to set up a custom TLS @@ -120,3 +164,12 @@ func WithHeaders(headers map[string]string) Option { func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } + +// WithRetry configures the retry policy for transient errors that may occurs +// when exporting traces. An exponential back-off algorithm is used to ensure +// endpoints are not overwhelmed with retries. If unset, the default retry +// policy will retry after 5 seconds and increase exponentially after each +// error for a total of 1 minute. +func WithRetry(rc RetryConfig) Option { + return wrappedOption{otlpconfig.WithRetry(retry.Config(rc))} +}