From d0f70a3a2479fcc304c0ec0c1afd3c75ea231224 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 1 Dec 2021 14:15:33 -0800 Subject: [PATCH 1/5] Use gRPC ClientConn for otlpmetricgrpc conn handling --- CHANGELOG.md | 1 + exporters/otlp/otlpmetric/go.mod | 1 - .../internal/connection/alignment_test.go | 38 -- .../internal/connection/connection.go | 431 ------------------ .../internal/connection/connection_test.go | 89 ---- .../otlpmetric/internal/otlpconfig/options.go | 71 +-- .../internal/otlpmetrictest/client.go | 16 + .../otlp/otlpmetric/internal/retry/retry.go | 137 ++++++ .../otlpmetric/internal/retry/retry_test.go | 197 ++++++++ .../otlp/otlpmetric/otlpmetricgrpc/client.go | 250 +++++++++- .../otlpmetric/otlpmetricgrpc/client_test.go | 425 +---------------- .../otlpmetricgrpc/client_unit_test.go | 193 ++++++++ .../otlpmetricgrpc/mock_collector_test.go | 62 +-- .../otlp/otlpmetric/otlpmetricgrpc/options.go | 118 +++-- .../otlp/otlpmetric/otlpmetrichttp/go.sum | 1 + 15 files changed, 919 insertions(+), 1111 deletions(-) delete mode 100644 exporters/otlp/otlpmetric/internal/connection/alignment_test.go delete mode 100644 exporters/otlp/otlpmetric/internal/connection/connection.go delete mode 100644 exporters/otlp/otlpmetric/internal/connection/connection_test.go create mode 100644 exporters/otlp/otlpmetric/internal/retry/retry.go create mode 100644 exporters/otlp/otlpmetric/internal/retry/retry_test.go create mode 100644 exporters/otlp/otlpmetric/otlpmetricgrpc/client_unit_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d083f219ea..025c9498ef8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `"go.opentelemetry.io/otel/exporter/otel/otlptrace/otlptracegrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#2329) - Changed the project minimum supported Go version from 1.15 to 1.16. (#2412) +- The `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetricgrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#TBD) ### Removed diff --git a/exporters/otlp/otlpmetric/go.mod b/exporters/otlp/otlpmetric/go.mod index bebf80f77f3..287d571fac5 100644 --- a/exporters/otlp/otlpmetric/go.mod +++ b/exporters/otlp/otlpmetric/go.mod @@ -12,7 +12,6 @@ require ( go.opentelemetry.io/otel/sdk/export/metric v0.25.0 go.opentelemetry.io/otel/sdk/metric v0.25.0 go.opentelemetry.io/proto/otlp v0.11.0 - google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.42.0 google.golang.org/protobuf v1.27.1 ) diff --git a/exporters/otlp/otlpmetric/internal/connection/alignment_test.go b/exporters/otlp/otlpmetric/internal/connection/alignment_test.go deleted file mode 100644 index aad85902c28..00000000000 --- a/exporters/otlp/otlpmetric/internal/connection/alignment_test.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection - -import ( - "os" - "testing" - "unsafe" - - ottest "go.opentelemetry.io/otel/internal/internaltest" -) - -// Ensure struct alignment prior to running tests. -func TestMain(m *testing.M) { - fields := []ottest.FieldOffset{ - { - Name: "Connection.lastConnectErrPtr", - Offset: unsafe.Offsetof(Connection{}.lastConnectErrPtr), - }, - } - if !ottest.Aligned8Byte(fields, os.Stderr) { - os.Exit(1) - } - - os.Exit(m.Run()) -} diff --git a/exporters/otlp/otlpmetric/internal/connection/connection.go b/exporters/otlp/otlpmetric/internal/connection/connection.go deleted file mode 100644 index dcb6db403fb..00000000000 --- a/exporters/otlp/otlpmetric/internal/connection/connection.go +++ /dev/null @@ -1,431 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/connection" - -import ( - "context" - "fmt" - "math/rand" - "sync" - "sync/atomic" - "time" - "unsafe" - - "github.com/cenkalti/backoff/v4" - "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" -) - -type Connection struct { - // Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines. - lastConnectErrPtr unsafe.Pointer - - // mu protects the Connection as it is accessed by the - // exporter goroutines and background Connection goroutine - mu sync.Mutex - cc *grpc.ClientConn - - // these fields are read-only after constructor is finished - cfg otlpconfig.Config - SCfg otlpconfig.SignalConfig - metadata metadata.MD - newConnectionHandler func(cc *grpc.ClientConn) - - // these channels are created once - disconnectedCh chan bool - backgroundConnectionDoneCh chan struct{} - stopCh chan struct{} - - // this is for tests, so they can replace the closing - // routine without a worry of modifying some global variable - // or changing it back to original after the test is done - closeBackgroundConnectionDoneCh func(ch chan struct{}) -} - -func NewConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler func(cc *grpc.ClientConn)) *Connection { - c := new(Connection) - c.newConnectionHandler = handler - c.cfg = cfg - c.SCfg = sCfg - if len(c.SCfg.Headers) > 0 { - c.metadata = metadata.New(c.SCfg.Headers) - } - c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { - close(ch) - } - return c -} - -func (c *Connection) StartConnection(ctx context.Context) error { - c.stopCh = make(chan struct{}) - c.disconnectedCh = make(chan bool, 1) - c.backgroundConnectionDoneCh = make(chan struct{}) - - if err := c.connect(ctx); err == nil { - c.setStateConnected() - } else { - c.SetStateDisconnected(err) - } - go c.indefiniteBackgroundConnection() - - // TODO: proper error handling when initializing connections. - // We can report permanent errors, e.g., invalid settings. - return nil -} - -func (c *Connection) LastConnectError() error { - errPtr := (*error)(atomic.LoadPointer(&c.lastConnectErrPtr)) - if errPtr == nil { - return nil - } - return *errPtr -} - -func (c *Connection) saveLastConnectError(err error) { - var errPtr *error - if err != nil { - errPtr = &err - } - atomic.StorePointer(&c.lastConnectErrPtr, unsafe.Pointer(errPtr)) -} - -func (c *Connection) SetStateDisconnected(err error) { - c.saveLastConnectError(err) - select { - case c.disconnectedCh <- true: - default: - } - c.newConnectionHandler(nil) -} - -func (c *Connection) setStateConnected() { - c.saveLastConnectError(nil) -} - -func (c *Connection) Connected() bool { - return c.LastConnectError() == nil -} - -const defaultConnReattemptPeriod = 10 * time.Second - -func (c *Connection) indefiniteBackgroundConnection() { - defer func() { - c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh) - }() - - connReattemptPeriod := c.cfg.ReconnectionPeriod - if connReattemptPeriod <= 0 { - connReattemptPeriod = defaultConnReattemptPeriod - } - - // No strong seeding required, nano time can - // already help with pseudo uniqueness. - rng := rand.New(rand.NewSource(time.Now().UnixNano() + rand.Int63n(1024))) - - // maxJitterNanos: 70% of the connectionReattemptPeriod - maxJitterNanos := int64(0.7 * float64(connReattemptPeriod)) - - for { - // Otherwise these will be the normal scenarios to enable - // reconnection if we trip out. - // 1. If we've stopped, return entirely - // 2. Otherwise block until we are disconnected, and - // then retry connecting - select { - case <-c.stopCh: - return - - case <-c.disconnectedCh: - // Quickly check if we haven't stopped at the - // same time. - select { - case <-c.stopCh: - return - - default: - } - - // Normal scenario that we'll wait for - } - - if err := c.connect(context.Background()); err == nil { - c.setStateConnected() - } else { - // this code is unreachable in most cases - // c.connect does not establish Connection - c.SetStateDisconnected(err) - } - - // Apply some jitter to avoid lockstep retrials of other - // collector-exporters. Lockstep retrials could result in an - // innocent DDOS, by clogging the machine's resources and network. - jitter := time.Duration(rng.Int63n(maxJitterNanos)) - select { - case <-c.stopCh: - return - case <-time.After(connReattemptPeriod + jitter): - } - } -} - -func (c *Connection) connect(ctx context.Context) error { - cc, err := c.dialToCollector(ctx) - if err != nil { - return err - } - c.setConnection(cc) - c.newConnectionHandler(cc) - return nil -} - -// setConnection sets cc as the client Connection and returns true if -// the Connection state changed. -func (c *Connection) setConnection(cc *grpc.ClientConn) bool { - c.mu.Lock() - defer c.mu.Unlock() - - // If previous clientConn is same as the current then just return. - // This doesn't happen right now as this func is only called with new ClientConn. - // It is more about future-proofing. - if c.cc == cc { - return false - } - - // If the previous clientConn was non-nil, close it - if c.cc != nil { - _ = c.cc.Close() - } - c.cc = cc - return true -} - -func (c *Connection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { - if c.cfg.GRPCConn != nil { - return c.cfg.GRPCConn, nil - } - - dialOpts := []grpc.DialOption{} - if c.cfg.ServiceConfig != "" { - dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.ServiceConfig)) - } - if c.SCfg.GRPCCredentials != nil { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.SCfg.GRPCCredentials)) - } else if c.SCfg.Insecure { - dialOpts = append(dialOpts, grpc.WithInsecure()) - } - if c.SCfg.Compression == otlpconfig.GzipCompression { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) - } - if len(c.cfg.DialOptions) != 0 { - dialOpts = append(dialOpts, c.cfg.DialOptions...) - } - - ctx, cancel := c.ContextWithStop(ctx) - defer cancel() - ctx = c.ContextWithMetadata(ctx) - return grpc.DialContext(ctx, c.SCfg.Endpoint, dialOpts...) -} - -func (c *Connection) ContextWithMetadata(ctx context.Context) context.Context { - if c.metadata.Len() > 0 { - return metadata.NewOutgoingContext(ctx, c.metadata) - } - return ctx -} - -func (c *Connection) Shutdown(ctx context.Context) error { - close(c.stopCh) - // Ensure that the backgroundConnector returns - select { - case <-c.backgroundConnectionDoneCh: - case <-ctx.Done(): - return ctx.Err() - } - - c.mu.Lock() - cc := c.cc - c.cc = nil - c.mu.Unlock() - - if cc != nil { - return cc.Close() - } - - return nil -} - -func (c *Connection) ContextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { - // Unify the parent context Done signal with the Connection's - // stop channel. - ctx, cancel := context.WithCancel(ctx) - go func(ctx context.Context, cancel context.CancelFunc) { - select { - case <-ctx.Done(): - // Nothing to do, either cancelled or deadline - // happened. - case <-c.stopCh: - cancel() - } - }(ctx, cancel) - return ctx, cancel -} - -func (c *Connection) DoRequest(ctx context.Context, fn func(context.Context) error) error { - expBackoff := newExponentialBackoff(c.cfg.RetrySettings) - - for { - err := fn(ctx) - if err == nil { - // request succeeded. - return nil - } - - if !c.cfg.RetrySettings.Enabled { - return err - } - - // We have an error, check gRPC status code. - st := status.Convert(err) - if st.Code() == codes.OK { - // Not really an error, still success. - return nil - } - - // Now, this is this a real error. - - if !shouldRetry(st.Code()) { - // It is not a retryable error, we should not retry. - return err - } - - // Need to retry. - - throttle := getThrottleDuration(st) - - backoffDelay := expBackoff.NextBackOff() - if backoffDelay == backoff.Stop { - // throw away the batch - err = fmt.Errorf("max elapsed time expired: %w", err) - return err - } - - var delay time.Duration - - if backoffDelay > throttle { - delay = backoffDelay - } else { - if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime { - err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err) - return err - } - - // Respect server throttling. - delay = throttle - } - - // back-off, but get interrupted when shutting down or request is cancelled or timed out. - err = func() error { - dt := time.NewTimer(delay) - defer dt.Stop() - - select { - case <-ctx.Done(): - return ctx.Err() - case <-c.stopCh: - return fmt.Errorf("interrupted due to shutdown: %w", err) - case <-dt.C: - } - - return nil - }() - - if err != nil { - return err - } - - } -} - -func shouldRetry(code codes.Code) bool { - switch code { - case codes.OK: - // Success. This function should not be called for this code, the best we - // can do is tell the caller not to retry. - return false - - case codes.Canceled, - codes.DeadlineExceeded, - codes.ResourceExhausted, - codes.Aborted, - codes.OutOfRange, - codes.Unavailable, - codes.DataLoss: - // These are retryable errors. - return true - - case codes.Unknown, - codes.InvalidArgument, - codes.Unauthenticated, - codes.PermissionDenied, - codes.NotFound, - codes.AlreadyExists, - codes.FailedPrecondition, - codes.Unimplemented, - codes.Internal: - // These are fatal errors, don't retry. - return false - - default: - // Don't retry on unknown codes. - return false - } -} - -func getThrottleDuration(status *status.Status) time.Duration { - // See if throttling information is available. - for _, detail := range status.Details() { - if t, ok := detail.(*errdetails.RetryInfo); ok { - if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 { - // We are throttled. Wait before retrying as requested by the server. - return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond - } - return 0 - } - } - return 0 -} - -func newExponentialBackoff(rs otlpconfig.RetrySettings) *backoff.ExponentialBackOff { - // Do not use NewExponentialBackOff since it calls Reset and the code here must - // call Reset after changing the InitialInterval (this saves an unnecessary call to Now). - expBackoff := &backoff.ExponentialBackOff{ - InitialInterval: rs.InitialInterval, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, - MaxInterval: rs.MaxInterval, - MaxElapsedTime: rs.MaxElapsedTime, - Stop: backoff.Stop, - Clock: backoff.SystemClock, - } - expBackoff.Reset() - - return expBackoff -} diff --git a/exporters/otlp/otlpmetric/internal/connection/connection_test.go b/exporters/otlp/otlpmetric/internal/connection/connection_test.go deleted file mode 100644 index f842fbd48e7..00000000000 --- a/exporters/otlp/otlpmetric/internal/connection/connection_test.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" -) - -func TestGetThrottleDuration(t *testing.T) { - tts := []struct { - stsFn func() (*status.Status, error) - throttle time.Duration - }{ - { - stsFn: func() (*status.Status, error) { - return status.New( - codes.OK, - "status with no retry info", - ), nil - }, - throttle: 0, - }, - { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with retry info") - return st.WithDetails( - &errdetails.RetryInfo{RetryDelay: durationpb.New(15 * time.Millisecond)}, - ) - }, - throttle: 15 * time.Millisecond, - }, - { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with error info detail") - return st.WithDetails( - &errdetails.ErrorInfo{Reason: "no throttle detail"}, - ) - }, - throttle: 0, - }, - { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with error info and retry info") - return st.WithDetails( - &errdetails.ErrorInfo{Reason: "no throttle detail"}, - &errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)}, - ) - }, - throttle: 13 * time.Minute, - }, - { - stsFn: func() (*status.Status, error) { - st := status.New(codes.ResourceExhausted, "status with two retry info should take the first") - return st.WithDetails( - &errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)}, - &errdetails.RetryInfo{RetryDelay: durationpb.New(18 * time.Minute)}, - ) - }, - throttle: 13 * time.Minute, - }, - } - - for _, tt := range tts { - sts, _ := tt.stsFn() - t.Run(sts.Message(), func(t *testing.T) { - th := getThrottleDuration(sts) - require.Equal(t, tt.throttle, th) - }) - } -} diff --git a/exporters/otlp/otlpmetric/internal/otlpconfig/options.go b/exporters/otlp/otlpmetric/internal/otlpconfig/options.go index 4a9c326f412..485555364b9 100644 --- a/exporters/otlp/otlpmetric/internal/otlpconfig/options.go +++ b/exporters/otlp/otlpmetric/internal/otlpconfig/options.go @@ -20,7 +20,11 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/encoding/gzip" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" ) const ( @@ -39,16 +43,6 @@ const ( DefaultTimeout time.Duration = 10 * time.Second ) -var ( - // defaultRetrySettings is a default settings for the retry policy. - defaultRetrySettings = RetrySettings{ - Enabled: true, - InitialInterval: 5 * time.Second, - MaxInterval: 30 * time.Second, - MaxElapsedTime: time.Minute, - } -) - type ( SignalConfig struct { Endpoint string @@ -67,16 +61,13 @@ type ( // Signal specific configurations Metrics SignalConfig - // HTTP configurations - MaxAttempts int - Backoff time.Duration + RetryConfig retry.Config // gRPC configurations ReconnectionPeriod time.Duration ServiceConfig string DialOptions []grpc.DialOption GRPCConn *grpc.ClientConn - RetrySettings RetrySettings } ) @@ -88,12 +79,46 @@ func NewDefaultConfig() Config { Compression: NoCompression, Timeout: DefaultTimeout, }, - RetrySettings: defaultRetrySettings, + RetryConfig: retry.DefaultConfig, } return c } +// NewGRPCConfig returns a new Config with all settings applied from opts and +// any unset setting using the default gRPC config values. +func NewGRPCConfig(opts ...GRPCOption) Config { + cfg := NewDefaultConfig() + ApplyGRPCEnvConfigs(&cfg) + for _, opt := range opts { + opt.ApplyGRPCOption(&cfg) + } + + if cfg.ServiceConfig != "" { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultServiceConfig(cfg.ServiceConfig)) + } + if cfg.Metrics.GRPCCredentials != nil { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(cfg.Metrics.GRPCCredentials)) + } else if cfg.Metrics.Insecure { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithInsecure()) + } + if cfg.Metrics.Compression == GzipCompression { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) + } + if len(cfg.DialOptions) != 0 { + cfg.DialOptions = append(cfg.DialOptions, cfg.DialOptions...) + } + if cfg.ReconnectionPeriod != 0 { + p := grpc.ConnectParams{ + Backoff: backoff.DefaultConfig, + MinConnectTimeout: cfg.ReconnectionPeriod, + } + cfg.DialOptions = append(cfg.DialOptions, grpc.WithConnectParams(p)) + } + + return cfg +} + type ( // GenericOption applies an option to the HTTP or gRPC driver. GenericOption interface { @@ -218,9 +243,9 @@ func WithURLPath(urlPath string) GenericOption { }) } -func WithRetry(settings RetrySettings) GenericOption { +func WithRetry(rc retry.Config) GenericOption { return newGenericOption(func(cfg *Config) { - cfg.RetrySettings = settings + cfg.RetryConfig = rc }) } @@ -255,15 +280,3 @@ func WithTimeout(duration time.Duration) GenericOption { cfg.Metrics.Timeout = duration }) } - -func WithMaxAttempts(maxAttempts int) GenericOption { - return newGenericOption(func(cfg *Config) { - cfg.MaxAttempts = maxAttempts - }) -} - -func WithBackoff(duration time.Duration) GenericOption { - return newGenericOption(func(cfg *Config) { - cfg.Backoff = duration - }) -} diff --git a/exporters/otlp/otlpmetric/internal/otlpmetrictest/client.go b/exporters/otlp/otlpmetric/internal/otlpmetrictest/client.go index fb29e3ede90..c248521ee14 100644 --- a/exporters/otlp/otlpmetric/internal/otlpmetrictest/client.go +++ b/exporters/otlp/otlpmetric/internal/otlpmetrictest/client.go @@ -55,6 +55,14 @@ func initializeExporter(t *testing.T, client otlpmetric.Client) *otlpmetric.Expo } func testClientStopHonorsTimeout(t *testing.T, client otlpmetric.Client) { + t.Cleanup(func() { + // The test is looking for a failed shut down. Call Stop a second time + // with an un-expired context to give the client a second chance at + // cleaning up. There is not guarantee from the Client interface this + // will succeed, therefore, no need to check the error (just give it a + // best try). + _ = client.Stop(context.Background()) + }) e := initializeExporter(t, client) innerCtx, innerCancel := context.WithTimeout(context.Background(), time.Microsecond) @@ -68,6 +76,14 @@ func testClientStopHonorsTimeout(t *testing.T, client otlpmetric.Client) { } func testClientStopHonorsCancel(t *testing.T, client otlpmetric.Client) { + t.Cleanup(func() { + // The test is looking for a failed shut down. Call Stop a second time + // with an un-expired context to give the client a second chance at + // cleaning up. There is not guarantee from the Client interface this + // will succeed, therefore, no need to check the error (just give it a + // best try). + _ = client.Stop(context.Background()) + }) e := initializeExporter(t, client) ctx, innerCancel := context.WithCancel(context.Background()) diff --git a/exporters/otlp/otlpmetric/internal/retry/retry.go b/exporters/otlp/otlpmetric/internal/retry/retry.go new file mode 100644 index 00000000000..830208f82f3 --- /dev/null +++ b/exporters/otlp/otlpmetric/internal/retry/retry.go @@ -0,0 +1,137 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" +) + +// DefaultConfig are the recommended defaults to use. +var DefaultConfig = Config{ + Enabled: true, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: time.Minute, +} + +// Config defines configuration for retrying batches in case of export failure +// using an exponential backoff. +type Config struct { + // Enabled indicates whether to not retry sending batches in case of + // export failure. + Enabled bool + // InitialInterval the time to wait after the first failure before + // retrying. + InitialInterval time.Duration + // MaxInterval is the upper bound on backoff interval. Once this value is + // reached the delay between consecutive retries will always be + // `MaxInterval`. + MaxInterval time.Duration + // MaxElapsedTime is the maximum amount of time (including retries) spent + // trying to send a request/batch. Once this value is reached, the data + // is discarded. + MaxElapsedTime time.Duration +} + +// RequestFunc wraps a request with retry logic. +type RequestFunc func(context.Context, func(context.Context) error) error + +// EvaluateFunc returns if an error is retry-able and if an explicit throttle +// duration should be honored that was included in the error. +type EvaluateFunc func(error) (bool, time.Duration) + +func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc { + if !c.Enabled { + return func(ctx context.Context, fn func(context.Context) error) error { + return fn(ctx) + } + } + + // Do not use NewExponentialBackOff since it calls Reset and the code here + // must call Reset after changing the InitialInterval (this saves an + // unnecessary call to Now). + b := &backoff.ExponentialBackOff{ + InitialInterval: c.InitialInterval, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: c.MaxInterval, + MaxElapsedTime: c.MaxElapsedTime, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + b.Reset() + + return func(ctx context.Context, fn func(context.Context) error) error { + for { + err := fn(ctx) + if err == nil { + return nil + } + + retryable, throttle := evaluate(err) + if !retryable { + return err + } + + bOff := b.NextBackOff() + if bOff == backoff.Stop { + return fmt.Errorf("max retry time elapsed: %w", err) + } + + // Wait for the greater of the backoff or throttle delay. + var delay time.Duration + if bOff > throttle { + delay = bOff + } else { + elapsed := b.GetElapsedTime() + if b.MaxElapsedTime != 0 && elapsed+throttle > b.MaxElapsedTime { + return fmt.Errorf("max retry time would elapse: %w", err) + } + delay = throttle + } + + if err := waitFunc(ctx, delay); err != nil { + return err + } + } + } +} + +// Allow override for testing. +var waitFunc = wait + +func wait(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + // Handle the case where the timer and context deadline end + // simultaneously by prioritizing the timer expiration nil value + // response. + select { + case <-timer.C: + default: + return ctx.Err() + } + case <-timer.C: + } + + return nil +} diff --git a/exporters/otlp/otlpmetric/internal/retry/retry_test.go b/exporters/otlp/otlpmetric/internal/retry/retry_test.go new file mode 100644 index 00000000000..d2b5b6c4b59 --- /dev/null +++ b/exporters/otlp/otlpmetric/internal/retry/retry_test.go @@ -0,0 +1,197 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWait(t *testing.T) { + tests := []struct { + ctx context.Context + delay time.Duration + expected error + }{ + { + ctx: context.Background(), + delay: time.Duration(0), + expected: nil, + }, + { + ctx: context.Background(), + delay: time.Duration(1), + expected: nil, + }, + { + ctx: context.Background(), + delay: time.Duration(-1), + expected: nil, + }, + { + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }(), + // Ensure the timer and context do not end simultaneously. + delay: 1 * time.Hour, + expected: context.Canceled, + }, + } + + for _, test := range tests { + assert.Equal(t, test.expected, wait(test.ctx, test.delay)) + } +} + +func TestNonRetryableError(t *testing.T) { + ev := func(error) (bool, time.Duration) { return false, 0 } + + reqFunc := Config{ + Enabled: true, + InitialInterval: 1 * time.Nanosecond, + MaxInterval: 1 * time.Nanosecond, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + ctx := context.Background() + assert.NoError(t, reqFunc(ctx, func(context.Context) error { + return nil + })) + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }), assert.AnError) +} + +func TestThrottledRetry(t *testing.T) { + // Ensure the throttle delay is used by making longer than backoff delay. + throttleDelay, backoffDelay := time.Second, time.Nanosecond + + ev := func(error) (bool, time.Duration) { + // Retry everything with a throttle delay. + return true, throttleDelay + } + + reqFunc := Config{ + Enabled: true, + InitialInterval: backoffDelay, + MaxInterval: backoffDelay, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + + origWait := waitFunc + var done bool + waitFunc = func(_ context.Context, delay time.Duration) error { + assert.Equal(t, throttleDelay, delay, "retry not throttled") + // Try twice to ensure call is attempted again after delay. + if done { + return assert.AnError + } + done = true + return nil + } + defer func() { waitFunc = origWait }() + + ctx := context.Background() + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return errors.New("not this error") + }), assert.AnError) +} + +func TestBackoffRetry(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + + delay := time.Nanosecond + reqFunc := Config{ + Enabled: true, + InitialInterval: delay, + MaxInterval: delay, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + + origWait := waitFunc + var done bool + waitFunc = func(_ context.Context, d time.Duration) error { + assert.Equal(t, delay, d, "retry not backoffed") + // Try twice to ensure call is attempted again after delay. + if done { + return assert.AnError + } + done = true + return nil + } + defer func() { waitFunc = origWait }() + + ctx := context.Background() + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return errors.New("not this error") + }), assert.AnError) +} + +func TestThrottledRetryGreaterThanMaxElapsedTime(t *testing.T) { + // Ensure the throttle delay is used by making longer than backoff delay. + tDelay, bDelay := time.Hour, time.Nanosecond + ev := func(error) (bool, time.Duration) { return true, tDelay } + reqFunc := Config{ + Enabled: true, + InitialInterval: bDelay, + MaxInterval: bDelay, + MaxElapsedTime: tDelay - (time.Nanosecond), + }.RequestFunc(ev) + + ctx := context.Background() + assert.Contains(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }).Error(), "max retry time would elapse: ") +} + +func TestMaxElapsedTime(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + delay := time.Nanosecond + reqFunc := Config{ + Enabled: true, + // InitialInterval > MaxElapsedTime means immediate return. + InitialInterval: 2 * delay, + MaxElapsedTime: delay, + }.RequestFunc(ev) + + ctx := context.Background() + assert.Contains(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }).Error(), "max retry time elapsed: ") +} + +func TestRetryNotEnabled(t *testing.T) { + ev := func(error) (bool, time.Duration) { + t.Error("evaluated retry when not enabled") + return false, 0 + } + + reqFunc := Config{}.RequestFunc(ev) + ctx := context.Background() + assert.NoError(t, reqFunc(ctx, func(context.Context) error { + return nil + })) + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }), assert.AnError) +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index c70be399fc7..9ef2c9eb520 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -17,43 +17,264 @@ package otlpmetricgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpme import ( "context" "errors" - "fmt" "sync" + "time" + "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/connection" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) type client struct { - connection *connection.Connection + endpoint string + dialOpts []grpc.DialOption + metadata metadata.MD + exportTimeout time.Duration + requestFunc retry.RequestFunc - lock sync.Mutex - metricsClient colmetricpb.MetricsServiceClient + // stopCtx is used as a parent context for all exports. Therefore, when it + // is canceled with the stopFunc all exports are canceled. + stopCtx context.Context + // stopFunc cancels stopCtx, stopping any active exports. + stopFunc context.CancelFunc + + // ourConn keeps track of where conn was created: true if created here on + // Start, or false if passed with an option. This is important on Shutdown + // as the conn should only be closed if created here on start. Otherwise, + // it is up to the processes that passed the conn to close it. + ourConn bool + conn *grpc.ClientConn + mscMu sync.RWMutex + msc colmetricpb.MetricsServiceClient } -var ( - errNoClient = errors.New("no client") -) +// Compile time check *client implements otlpmetric.Client. +var _ otlpmetric.Client = (*client)(nil) // NewClient creates a new gRPC metric client. func NewClient(opts ...Option) otlpmetric.Client { - cfg := otlpconfig.NewDefaultConfig() - otlpconfig.ApplyGRPCEnvConfigs(&cfg) - for _, opt := range opts { - opt.applyGRPCOption(&cfg) + return newClient(opts...) +} + +func newClient(opts ...Option) *client { + cfg := otlpconfig.NewGRPCConfig(asGRPCOptions(opts)...) + + ctx, cancel := context.WithCancel(context.Background()) + + c := &client{ + endpoint: cfg.Metrics.Endpoint, + exportTimeout: cfg.Metrics.Timeout, + requestFunc: cfg.RetryConfig.RequestFunc(retryable), + dialOpts: cfg.DialOptions, + stopCtx: ctx, + stopFunc: cancel, + conn: cfg.GRPCConn, } - c := &client{} - c.connection = connection.NewConnection(cfg, cfg.Metrics, c.handleNewConnection) + if len(cfg.Metrics.Headers) > 0 { + c.metadata = metadata.New(cfg.Metrics.Headers) + } return c } +// Start establishes a gRPC connection to the collector. +func (c *client) Start(ctx context.Context) error { + if c.conn == nil { + // If the caller did not provide a ClientConn when the client was + // created, create one using the configuration they did provide. + conn, err := grpc.DialContext(ctx, c.endpoint, c.dialOpts...) + if err != nil { + return err + } + // Keep track that we own the lifecycle of this conn and need to close + // it on Shutdown. + c.ourConn = true + c.conn = conn + } + + // The otlpmetric.Client interface states this method is called just once, + // so no need to check if already started. + c.mscMu.Lock() + c.msc = colmetricpb.NewMetricsServiceClient(c.conn) + c.mscMu.Unlock() + + return nil +} + +var errAlreadyStopped = errors.New("the client is already stopped") + +// Stop shuts down the client. +// +// Any active connections to a remote endpoint are closed if they were created +// by the client. Any gRPC connection passed during creation using +// WithGRPCConn will not be closed. It is the caller's responsibility to +// handle cleanup of that resource. +// +// This method synchronizes with the UploadMetrics method of the client. It +// will wait for any active calls to that method to complete unimpeded, or it +// will cancel any active calls if ctx expires. If ctx expires, the context +// error will be forwarded as the returned error. All client held resources +// will still be released in this situation. +// +// If the client has already stopped, an error will be returned describing +// this. +func (c *client) Stop(ctx context.Context) error { + // Acquire the c.mscMu lock within the ctx lifetime. + acquired := make(chan struct{}) + go func() { + c.mscMu.Lock() + close(acquired) + }() + var err error + select { + case <-ctx.Done(): + // The Stop timeout is reached. Kill any remaining exports to force + // the clear of the lock and save the timeout error to return and + // signal the shutdown timed out before cleanly stopping. + c.stopFunc() + err = ctx.Err() + + // To ensure the client is not left in a dirty state c.msc needs to be + // set to nil. To avoid the race condition when doing this, ensure + // that all the exports are killed (initiated by c.stopFunc). + <-acquired + case <-acquired: + } + // Hold the mscMu lock for the rest of the function to ensure no new + // exports are started. + defer c.mscMu.Unlock() + + // The otlpmetric.Client interface states this method is called only + // once, but there is no guarantee it is called after Start. Ensure the + // client is started before doing anything and let the called know if they + // made a mistake. + if c.msc == nil { + return errAlreadyStopped + } + + // Clear c.msc to signal the client is stopped. + c.msc = nil + + if c.ourConn { + closeErr := c.conn.Close() + // A context timeout error takes precedence over this error. + if err == nil && closeErr != nil { + err = closeErr + } + } + return err +} + +var errShutdown = errors.New("the client is shutdown") + +// UploadMetrics sends a batch of spans. +// +// Retryable errors from the server will be handled according to any +// RetryConfig the client was created with. +func (c *client) UploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error { + // Hold a read lock to ensure a shut down initiated after this starts does + // not abandon the export. This read lock acquire has less priority than a + // write lock acquire (i.e. Stop), meaning if the client is shutting down + // this will come after the shut down. + c.mscMu.RLock() + defer c.mscMu.RUnlock() + + if c.msc == nil { + return errShutdown + } + + ctx, cancel := c.exportContext(ctx) + defer cancel() + + return c.requestFunc(ctx, func(iCtx context.Context) error { + _, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{ + ResourceMetrics: protoMetrics, + }) + // nil is converted to OK. + if status.Code(err) == codes.OK { + // Success. + return nil + } + return err + }) +} + +// exportContext returns a copy of parent with an appropriate deadline and +// cancellation function. +// +// It is the callers responsibility to cancel the returned context once its +// use is complete, via the parent or directly with the returned CancelFunc, to +// ensure all resources are correctly released. +func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) { + var ( + ctx context.Context + cancel context.CancelFunc + ) + + if c.exportTimeout > 0 { + ctx, cancel = context.WithTimeout(parent, c.exportTimeout) + } else { + ctx, cancel = context.WithCancel(parent) + } + + if c.metadata.Len() > 0 { + ctx = metadata.NewOutgoingContext(ctx, c.metadata) + } + + // Unify the client stopCtx with the parent. + go func() { + select { + case <-ctx.Done(): + case <-c.stopCtx.Done(): + // Cancel the export as the shutdown has timed out. + cancel() + } + }() + + return ctx, cancel +} + +// retryable returns if err identifies a request that can be retried and a +// duration to wait for if an explicit throttle time is included in err. +func retryable(err error) (bool, time.Duration) { + //func retryable(err error) (bool, time.Duration) { + s := status.Convert(err) + switch s.Code() { + case codes.Canceled, + codes.DeadlineExceeded, + codes.ResourceExhausted, + codes.Aborted, + codes.OutOfRange, + codes.Unavailable, + codes.DataLoss: + return true, throttleDelay(s) + } + + // Not a retry-able error. + return false, 0 +} + +// throttleDelay returns a duration to wait for if an explicit throttle time +// is included in the response status. +func throttleDelay(status *status.Status) time.Duration { + for _, detail := range status.Details() { + if t, ok := detail.(*errdetails.RetryInfo); ok { + return t.RetryDelay.AsDuration() + } + } + return 0 +} + +/* func (c *client) handleNewConnection(cc *grpc.ClientConn) { c.lock.Lock() defer c.lock.Unlock() @@ -105,3 +326,4 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics []*metricpb.Res } return err } +*/ diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go index bf55eccfb8b..dbd8f47998b 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go @@ -24,12 +24,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest" @@ -165,386 +163,6 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { } } -func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) { - // TODO: Fix this test #1527 - t.Skip("This test is flaky and needs to be rewritten") - mc := runMockCollector(t) - - reconnectionPeriod := 20 * time.Millisecond - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetrySettings{Enabled: false}), - otlpmetricgrpc.WithReconnectionPeriod(reconnectionPeriod)) - defer func() { require.NoError(t, exp.Shutdown(ctx)) }() - - // Wait for a connection. - mc.ln.WaitForConn() - - // We'll now stop the collector right away to simulate a connection - // dying in the midst of communication or even not existing before. - require.NoError(t, mc.stop()) - - // first export, it will send disconnected message to the channel on export failure, - // trigger almost immediate reconnection - require.Error(t, exp.Export(ctx, testResource, oneRecord)) - - // second export, it will detect connection issue, change state of exporter to disconnected and - // send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel) - require.Error(t, exp.Export(ctx, testResource, oneRecord)) - - // as a result we have exporter in disconnected state waiting for disconnection message to reconnect - - // resurrect collector - nmc := runMockCollectorAtEndpoint(t, mc.endpoint) - - // make sure reconnection loop hits beginning and goes back to waiting mode - // after hitting beginning of the loop it should reconnect - nmc.ln.WaitForConn() - - n := 10 - for i := 0; i < n; i++ { - // when disconnected exp.Export doesnt send disconnected messages again - // it just quits and return last connection error - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - } - - nmaMetrics := nmc.getMetrics() - - if g, w := len(nmaMetrics), n; g != w { - t.Fatalf("Connected collector: metrics: got %d want %d", g, w) - } - - dMetrics := mc.getMetrics() - // Expecting 0 metrics to have been received by the original but now dead collector - if g, w := len(dMetrics), 0; g != w { - t.Fatalf("Disconnected collector: spans: got %d want %d", g, w) - } - - require.NoError(t, nmc.Stop()) -} - -func TestExporterExportFailureAndRecoveryModes(t *testing.T) { - tts := []struct { - name string - errors []error - rs otlpmetricgrpc.RetrySettings - fn func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) - opts []otlpmetricgrpc.Option - }{ - { - name: "Do not retry if succeeded", - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 1) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 success request.") - }, - }, - { - name: "Do not retry if 'error' is ok", - errors: []error{ - status.Error(codes.OK, ""), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 error OK request.") - }, - }, - { - name: "Fail three times and succeed", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: 300 * time.Millisecond, - InitialInterval: 2 * time.Millisecond, - MaxInterval: 10 * time.Millisecond, - }, - errors: []error{ - status.Error(codes.Unavailable, "backend under pressure"), - status.Error(codes.Unavailable, "backend under pressure"), - status.Error(codes.Unavailable, "backend under pressure"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 1) - require.Equal(t, 4, mc.metricSvc.requests, "metric service must receive 3 failure requests and 1 success request.") - }, - }, - { - name: "Permanent error should not be retried", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: 300 * time.Millisecond, - InitialInterval: 2 * time.Millisecond, - MaxInterval: 10 * time.Millisecond, - }, - errors: []error{ - status.Error(codes.InvalidArgument, "invalid arguments"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.Error(t, exp.Export(ctx, testResource, oneRecord)) - - metric := mc.getMetrics() - - require.Len(t, metric, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 error requests.") - }, - }, - { - name: "Test all transient errors and succeed", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: 500 * time.Millisecond, - InitialInterval: 1 * time.Millisecond, - MaxInterval: 2 * time.Millisecond, - }, - errors: []error{ - status.Error(codes.Canceled, ""), - status.Error(codes.DeadlineExceeded, ""), - status.Error(codes.ResourceExhausted, ""), - status.Error(codes.Aborted, ""), - status.Error(codes.OutOfRange, ""), - status.Error(codes.Unavailable, ""), - status.Error(codes.DataLoss, ""), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 1) - require.Equal(t, 8, mc.metricSvc.requests, "metric service must receive 9 failure requests and 1 success request.") - }, - }, - { - name: "Retry should honor server throttling", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: time.Minute, - InitialInterval: time.Nanosecond, - MaxInterval: time.Nanosecond, - }, - opts: []otlpmetricgrpc.Option{ - otlpmetricgrpc.WithTimeout(time.Millisecond * 100), - }, - errors: []error{ - newThrottlingError(codes.ResourceExhausted, time.Second*30), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - require.Equal(t, "context deadline exceeded", err.Error()) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 failure requests and 1 success request.") - }, - }, - { - name: "Retry should fail if server throttling is higher than the MaxElapsedTime", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: time.Millisecond * 100, - InitialInterval: time.Nanosecond, - MaxInterval: time.Nanosecond, - }, - errors: []error{ - newThrottlingError(codes.ResourceExhausted, time.Minute), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - require.Equal(t, "max elapsed time expired when respecting server throttle: rpc error: code = ResourceExhausted desc = ", err.Error()) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 failure requests and 1 success request.") - }, - }, - { - name: "Retry stops if takes too long", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: true, - MaxElapsedTime: time.Millisecond * 100, - InitialInterval: time.Millisecond * 50, - MaxInterval: time.Millisecond * 50, - }, - errors: []error{ - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - status.Error(codes.Unavailable, "unavailable"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - - require.Equal(t, "max elapsed time expired: rpc error: code = Unavailable desc = unavailable", err.Error()) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.LessOrEqual(t, 1, mc.metricSvc.requests, "metric service must receive at least 1 failure requests.") - }, - }, - { - name: "Disabled retry", - rs: otlpmetricgrpc.RetrySettings{ - Enabled: false, - }, - errors: []error{ - status.Error(codes.Unavailable, "unavailable"), - }, - fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - - require.Equal(t, "rpc error: code = Unavailable desc = unavailable", err.Error()) - - metrics := mc.getMetrics() - - require.Len(t, metrics, 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 failure requests.") - }, - }, - } - - for _, tt := range tts { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - - mc := runMockCollectorWithConfig(t, &mockConfig{ - errors: tt.errors, - }) - - opts := []otlpmetricgrpc.Option{ - otlpmetricgrpc.WithRetry(tt.rs), - } - - if len(tt.opts) != 0 { - opts = append(opts, tt.opts...) - } - - exp := newGRPCExporter(t, ctx, mc.endpoint, opts...) - - tt.fn(t, ctx, exp, mc) - - require.NoError(t, mc.Stop()) - require.NoError(t, exp.Shutdown(ctx)) - }) - } - -} - -func TestPermanentErrorsShouldNotBeRetried(t *testing.T) { - permanentErrors := []*status.Status{ - status.New(codes.Unknown, "Unknown"), - status.New(codes.InvalidArgument, "InvalidArgument"), - status.New(codes.NotFound, "NotFound"), - status.New(codes.AlreadyExists, "AlreadyExists"), - status.New(codes.FailedPrecondition, "FailedPrecondition"), - status.New(codes.Unimplemented, "Unimplemented"), - status.New(codes.Internal, "Internal"), - status.New(codes.PermissionDenied, ""), - status.New(codes.Unauthenticated, ""), - } - - for _, sts := range permanentErrors { - t.Run(sts.Code().String(), func(t *testing.T) { - ctx := context.Background() - - mc := runMockCollectorWithConfig(t, &mockConfig{ - errors: []error{sts.Err()}, - }) - - exp := newGRPCExporter(t, ctx, mc.endpoint) - - err := exp.Export(ctx, testResource, oneRecord) - require.Error(t, err) - require.Len(t, mc.getMetrics(), 0) - require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 permanent error requests.") - - require.NoError(t, mc.Stop()) - require.NoError(t, exp.Shutdown(ctx)) - }) - } -} - -func newThrottlingError(code codes.Code, duration time.Duration) error { - s := status.New(code, "") - - s, _ = s.WithDetails(&errdetails.RetryInfo{RetryDelay: durationpb.New(duration)}) - - return s.Err() -} - -func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { - // TODO: Fix this test #1527 - t.Skip("This test is flaky and needs to be rewritten") - mc := runMockCollector(t) - - reconnectionPeriod := 50 * time.Millisecond - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetrySettings{Enabled: false}), - otlpmetricgrpc.WithReconnectionPeriod(reconnectionPeriod)) - defer func() { require.NoError(t, exp.Shutdown(ctx)) }() - - mc.ln.WaitForConn() - - // We'll now stop the collector right away to simulate a connection - // dying in the midst of communication or even not existing before. - require.NoError(t, mc.stop()) - - // In the test below, we'll stop the collector many times, - // while exporting metrics and test to ensure that we can - // reconnect. - for j := 0; j < 3; j++ { - - // No endpoint up. - require.Error(t, exp.Export(ctx, testResource, oneRecord)) - - // Now resurrect the collector by making a new one but reusing the - // old endpoint, and the collector should reconnect automatically. - nmc := runMockCollectorAtEndpoint(t, mc.endpoint) - - // Give the exporter sometime to reconnect - nmc.ln.WaitForConn() - - n := 10 - for i := 0; i < n; i++ { - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) - } - - nmaMetrics := nmc.getMetrics() - // Expecting 10 metrics that were sampled, given that - if g, w := len(nmaMetrics), n; g != w { - t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w) - } - - dMetrics := mc.getMetrics() - // Expecting 0 metrics to have been received by the original but now dead collector - if g, w := len(dMetrics), 0; g != w { - t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w) - } - - // Disconnect for the next try. - require.NoError(t, nmc.stop()) - } -} - // This test takes a long time to run: to skip it, run tests using: -short func TestNewExporter_collectorOnBadConnection(t *testing.T) { if testing.Short() { @@ -641,7 +259,7 @@ func TestNewExporter_WithTimeout(t *testing.T) { }() ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, otlpmetricgrpc.WithTimeout(tt.timeout), otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetrySettings{Enabled: false})) + exp := newGRPCExporter(t, ctx, mc.endpoint, otlpmetricgrpc.WithTimeout(tt.timeout), otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{Enabled: false})) defer func() { _ = exp.Shutdown(ctx) }() @@ -662,48 +280,31 @@ func TestNewExporter_WithTimeout(t *testing.T) { } } -func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) { +func TestStartErrorInvalidSecurityConfiguration(t *testing.T) { mc := runMockCollector(t) defer func() { _ = mc.stop() }() - ctx := context.Background() client := otlpmetricgrpc.NewClient(otlpmetricgrpc.WithEndpoint(mc.endpoint)) - exp, err := otlpmetric.New(ctx, client) - if err != nil { - t.Fatalf("failed to create a new collector exporter: %v", err) - } - - err = exp.Export(ctx, testResource, oneRecord) - - expectedErr := fmt.Sprintf("metrics exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint) - - require.Error(t, err) - require.Equal(t, expectedErr, err.Error()) - - defer func() { - _ = exp.Shutdown(ctx) - }() + err := client.Start(context.Background()) + // https://github.com/grpc/grpc-go/blob/a671967dfbaab779d37fd7e597d9248f13806087/clientconn.go#L82 + assert.EqualError(t, err, "grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") } -func TestDisconnected(t *testing.T) { - ctx := context.Background() - // The endpoint is whatever, we want to be disconnected. But we - // setting a blocking connection, so dialing to the invalid - // endpoint actually fails. - exp := newGRPCExporter(t, ctx, "invalid", - otlpmetricgrpc.WithReconnectionPeriod(time.Hour), +func TestStartErrorInvalidAddress(t *testing.T) { + client := otlpmetricgrpc.NewClient( + otlpmetricgrpc.WithInsecure(), + // Validate the connection in Start (which should return the error). otlpmetricgrpc.WithDialOption( grpc.WithBlock(), grpc.FailOnNonTempDialError(true), ), + otlpmetricgrpc.WithEndpoint("invalid"), + otlpmetricgrpc.WithReconnectionPeriod(time.Hour), ) - defer func() { - assert.NoError(t, exp.Shutdown(ctx)) - }() - - assert.Error(t, exp.Export(ctx, testResource, oneRecord)) + err := client.Start(context.Background()) + assert.EqualError(t, err, `connection error: desc = "transport: error while dialing: dial tcp: address invalid: missing port in address"`) } func TestEmptyData(t *testing.T) { diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_unit_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_unit_test.go new file mode 100644 index 00000000000..ccd4ade1389 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_unit_test.go @@ -0,0 +1,193 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpmetricgrpc + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" +) + +func TestThrottleDuration(t *testing.T) { + c := codes.ResourceExhausted + testcases := []struct { + status *status.Status + expected time.Duration + }{ + { + status: status.New(c, "no retry info"), + expected: 0, + }, + { + status: func() *status.Status { + s, err := status.New(c, "single retry info").WithDetails( + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(15 * time.Millisecond), + }, + ) + require.NoError(t, err) + return s + }(), + expected: 15 * time.Millisecond, + }, + { + status: func() *status.Status { + s, err := status.New(c, "error info").WithDetails( + &errdetails.ErrorInfo{Reason: "no throttle detail"}, + ) + require.NoError(t, err) + return s + }(), + expected: 0, + }, + { + status: func() *status.Status { + s, err := status.New(c, "error and retry info").WithDetails( + &errdetails.ErrorInfo{Reason: "with throttle detail"}, + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(13 * time.Minute), + }, + ) + require.NoError(t, err) + return s + }(), + expected: 13 * time.Minute, + }, + { + status: func() *status.Status { + s, err := status.New(c, "double retry info").WithDetails( + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(13 * time.Minute), + }, + &errdetails.RetryInfo{ + RetryDelay: durationpb.New(15 * time.Minute), + }, + ) + require.NoError(t, err) + return s + }(), + expected: 13 * time.Minute, + }, + } + + for _, tc := range testcases { + t.Run(tc.status.Message(), func(t *testing.T) { + require.Equal(t, tc.expected, throttleDelay(tc.status)) + }) + } +} + +func TestRetryable(t *testing.T) { + retryableCodes := map[codes.Code]bool{ + codes.OK: false, + codes.Canceled: true, + codes.Unknown: false, + codes.InvalidArgument: false, + codes.DeadlineExceeded: true, + codes.NotFound: false, + codes.AlreadyExists: false, + codes.PermissionDenied: false, + codes.ResourceExhausted: true, + codes.FailedPrecondition: false, + codes.Aborted: true, + codes.OutOfRange: true, + codes.Unimplemented: false, + codes.Internal: false, + codes.Unavailable: true, + codes.DataLoss: true, + codes.Unauthenticated: false, + } + + for c, want := range retryableCodes { + got, _ := retryable(status.Error(c, "")) + assert.Equalf(t, want, got, "evaluate(%s)", c) + } +} + +func TestUnstartedStop(t *testing.T) { + client := NewClient() + assert.ErrorIs(t, client.Stop(context.Background()), errAlreadyStopped) +} + +func TestUnstartedUploadMetric(t *testing.T) { + client := NewClient() + assert.ErrorIs(t, client.UploadMetrics(context.Background(), nil), errShutdown) +} + +func TestExportContextHonorsParentDeadline(t *testing.T) { + now := time.Now() + ctx, cancel := context.WithDeadline(context.Background(), now) + t.Cleanup(cancel) + + // Without a client timeout, the parent deadline should be used. + client := newClient(WithTimeout(0)) + eCtx, eCancel := client.exportContext(ctx) + t.Cleanup(eCancel) + + deadline, ok := eCtx.Deadline() + assert.True(t, ok, "deadline not propagated to child context") + assert.Equal(t, now, deadline) +} + +func TestExportContextHonorsClientTimeout(t *testing.T) { + // Setting a timeout should ensure a deadline is set on the context. + client := newClient(WithTimeout(1 * time.Second)) + ctx, cancel := client.exportContext(context.Background()) + t.Cleanup(cancel) + + _, ok := ctx.Deadline() + assert.True(t, ok, "timeout not set as deadline for child context") +} + +func TestExportContextLinksStopSignal(t *testing.T) { + rootCtx := context.Background() + + client := newClient(WithInsecure()) + t.Cleanup(func() { require.NoError(t, client.Stop(rootCtx)) }) + require.NoError(t, client.Start(rootCtx)) + + ctx, cancel := client.exportContext(rootCtx) + t.Cleanup(cancel) + + require.False(t, func() bool { + select { + case <-ctx.Done(): + return true + default: + } + return false + }(), "context should not be done prior to canceling it") + + // The client.stopFunc cancels the client.stopCtx. This should have been + // setup as a parent of ctx. Therefore, it should cancel ctx as well. + client.stopFunc() + + // Assert this with Eventually to account for goroutine scheduler timing. + assert.Eventually(t, func() bool { + select { + case <-ctx.Done(): + return true + default: + } + return false + }, 10*time.Second, time.Microsecond) +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/mock_collector_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/mock_collector_test.go index ea28c1a7fad..3eecfef39c0 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/mock_collector_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/mock_collector_test.go @@ -18,8 +18,6 @@ import ( "context" "fmt" "net" - "runtime" - "strings" "sync" "testing" "time" @@ -94,7 +92,6 @@ type mockCollector struct { metricSvc *mockMetricService endpoint string - ln *listener stopFunc func() stopOnce sync.Once } @@ -160,9 +157,8 @@ func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockColle srv := grpc.NewServer() mc := makeMockCollector(t, mockConfig) collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc) - mc.ln = newListener(ln) go func() { - _ = srv.Serve((net.Listener)(mc.ln)) + _ = srv.Serve(ln) }() mc.endpoint = ln.Addr().String() @@ -171,59 +167,3 @@ func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockColle return mc } - -type listener struct { - closeOnce sync.Once - wrapped net.Listener - C chan struct{} -} - -func newListener(wrapped net.Listener) *listener { - return &listener{ - wrapped: wrapped, - C: make(chan struct{}, 1), - } -} - -func (l *listener) Close() error { return l.wrapped.Close() } - -func (l *listener) Addr() net.Addr { return l.wrapped.Addr() } - -// Accept waits for and returns the next connection to the listener. It will -// send a signal on l.C that a connection has been made before returning. -func (l *listener) Accept() (net.Conn, error) { - conn, err := l.wrapped.Accept() - if err != nil { - // Go 1.16 exported net.ErrClosed that could clean up this check, but to - // remain backwards compatible with previous versions of Go that we - // support the following string evaluation is used instead to keep in line - // with the previously recommended way to check this: - // https://github.com/golang/go/issues/4373#issuecomment-353076799 - if strings.Contains(err.Error(), "use of closed network connection") { - // If the listener has been closed, do not allow callers of - // WaitForConn to wait for a connection that will never come. - l.closeOnce.Do(func() { close(l.C) }) - } - return conn, err - } - - select { - case l.C <- struct{}{}: - default: - // If C is full, assume nobody is listening and move on. - } - return conn, nil -} - -// WaitForConn will wait indefintely for a connection to be estabilished with -// the listener before returning. -func (l *listener) WaitForConn() { - for { - select { - case <-l.C: - return - default: - runtime.Gosched() - } - } -} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/options.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/options.go index 1145fdc9f5e..685e08aeb67 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/options.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/options.go @@ -23,16 +23,28 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" ) -// Option applies an option to the gRPC client. +// Option applies an option to the gRPC driver. type Option interface { applyGRPCOption(*otlpconfig.Config) } -// RetrySettings defines configuration for retrying batches in case of export failure -// using an exponential backoff. -type RetrySettings otlpconfig.RetrySettings +func asGRPCOptions(opts []Option) []otlpconfig.GRPCOption { + converted := make([]otlpconfig.GRPCOption, len(opts)) + for i, o := range opts { + converted[i] = otlpconfig.NewGRPCOption(o.applyGRPCOption) + } + return converted +} + +// RetryConfig defines configuration for retrying export of span batches that +// failed to be received by the target endpoint. +// +// This configuration does not define any network retry strategy. That is +// entirely handled by the gRPC ClientConn. +type RetryConfig retry.Config type wrappedOption struct { otlpconfig.GRPCOption @@ -42,22 +54,28 @@ func (w wrappedOption) applyGRPCOption(cfg *otlpconfig.Config) { w.ApplyGRPCOption(cfg) } -// WithInsecure disables client transport security for the exporter's gRPC connection -// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure -// does. Note, by default, client security is required unless WithInsecure is used. +// WithInsecure disables client transport security for the exporter's gRPC +// connection just like grpc.WithInsecure() +// (https://pkg.go.dev/google.golang.org/grpc#WithInsecure) does. Note, by +// default, client security is required unless WithInsecure is used. +// +// This option has no effect if WithGRPCConn is used. func WithInsecure() Option { return wrappedOption{otlpconfig.WithInsecure()} } -// WithEndpoint allows one to set the endpoint that the exporter will -// connect to the collector on. If unset, it will instead try to use -// connect to DefaultCollectorHost:DefaultCollectorPort. +// WithEndpoint sets the target endpoint the exporter will connect to. If +// unset, localhost:4317 will be used as a default. +// +// This option has no effect if WithGRPCConn is used. func WithEndpoint(endpoint string) Option { return wrappedOption{otlpconfig.WithEndpoint(endpoint)} } -// WithReconnectionPeriod allows one to set the delay between next connection attempt -// after failing to connect with the collector. +// WithReconnectionPeriod set the minimum amount of time between connection +// attempts to the target endpoint. +// +// This option has no effect if WithGRPCConn is used. func WithReconnectionPeriod(rp time.Duration) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.ReconnectionPeriod = rp @@ -74,25 +92,30 @@ func compressorToCompression(compressor string) otlpconfig.Compression { return otlpconfig.NoCompression } -// WithCompressor will set the compressor for the gRPC client to use when sending requests. -// It is the responsibility of the caller to ensure that the compressor set has been registered -// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some -// compressors auto-register on import, such as gzip, which can be registered by calling +// WithCompressor sets the compressor for the gRPC client to use when sending +// requests. It is the responsibility of the caller to ensure that the +// compressor set has been registered with google.golang.org/grpc/encoding. +// This can be done by encoding.RegisterCompressor. Some compressors +// auto-register on import, such as gzip, which can be registered by calling // `import _ "google.golang.org/grpc/encoding/gzip"`. +// +// This option has no effect if WithGRPCConn is used. func WithCompressor(compressor string) Option { return wrappedOption{otlpconfig.WithCompression(compressorToCompression(compressor))} } -// WithHeaders will send the provided headers with gRPC requests. +// WithHeaders will send the provided headers with each gRPC requests. func WithHeaders(headers map[string]string) Option { return wrappedOption{otlpconfig.WithHeaders(headers)} } -// WithTLSCredentials allows the connection to use TLS credentials -// when talking to the server. It takes in grpc.TransportCredentials instead -// of say a Certificate file or a tls.Certificate, because the retrieving of -// these credentials can be done in many ways e.g. plain file, in code tls.Config -// or by certificate rotation, so it is up to the caller to decide what to use. +// WithTLSCredentials allows the connection to use TLS credentials when +// talking to the server. It takes in grpc.TransportCredentials instead of say +// a Certificate file or a tls.Certificate, because the retrieving of these +// credentials can be done in many ways e.g. plain file, in code tls.Config or +// by certificate rotation, so it is up to the caller to decide what to use. +// +// This option has no effect if WithGRPCConn is used. func WithTLSCredentials(creds credentials.TransportCredentials) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.Metrics.GRPCCredentials = creds @@ -100,40 +123,63 @@ func WithTLSCredentials(creds credentials.TransportCredentials) Option { } // WithServiceConfig defines the default gRPC service config used. +// +// This option has no effect if WithGRPCConn is used. func WithServiceConfig(serviceConfig string) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.ServiceConfig = serviceConfig })} } -// WithDialOption opens support to any grpc.DialOption to be used. If it conflicts -// with some other configuration the GRPC specified via the collector the ones here will -// take preference since they are set last. +// WithDialOption sets explicit grpc.DialOptions to use when making a +// connection. The options here are appended to the internal grpc.DialOptions +// used so they will take precedence over any other internal grpc.DialOptions +// they might conflict with. +// +// This option has no effect if WithGRPCConn is used. func WithDialOption(opts ...grpc.DialOption) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.DialOptions = opts })} } -// WithGRPCConn allows reusing existing gRPC connection when it has already been -// established for other services. When set, other dial options will be ignored. +// WithGRPCConn sets conn as the gRPC ClientConn used for all communication. +// +// This option takes precedence over any other option that relates to +// establishing or persisting a gRPC connection to a target endpoint. Any +// other option of those types passed will be ignored. +// +// It is the callers responsibility to close the passed conn. The client +// Shutdown method will not close this connection. func WithGRPCConn(conn *grpc.ClientConn) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.GRPCConn = conn })} } -// WithTimeout tells the client the max waiting time for the backend to process -// each metrics batch. If unset, the default will be 10 seconds. +// WithTimeout sets the max amount of time a client will attempt to export a +// batch of spans. This takes precedence over any retry settings defined with +// WithRetry, once this time limit has been reached the export is abandoned +// and the batch of spans is dropped. +// +// If unset, the default timeout will be set to 10 seconds. func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } -// WithRetry configures the retry policy for transient errors that may occurs when -// exporting metrics. An exponential back-off algorithm is used to -// ensure endpoints are not overwhelmed with retries. If unset, the default -// retry policy will retry after 5 seconds and increase exponentially after each -// error for a total of 1 minute. -func WithRetry(settings RetrySettings) Option { - return wrappedOption{otlpconfig.WithRetry(otlpconfig.RetrySettings(settings))} +// WithRetry sets the retry policy for transient retryable errors that may be +// returned by the target endpoint when exporting a batch of spans. +// +// If the target endpoint responds with not only a retryable error, but +// explicitly returns a backoff time in the response. That time will take +// precedence over these settings. +// +// These settings do not define any network retry strategy. That is entirely +// handled by the gRPC ClientConn. +// +// If unset, the default retry policy will be used. It will retry the export +// 5 seconds after receiving a retryable error and increase exponentially +// after each error for no more than a total time of 1 minute. +func WithRetry(settings RetryConfig) Option { + return wrappedOption{otlpconfig.WithRetry(retry.Config(settings))} } diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum b/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum index ecbbaba5f91..abf3eb08c2b 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum @@ -4,6 +4,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= From f175bba84d8a9c92ed642a55ca24ea3d3b175e12 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 1 Dec 2021 14:19:07 -0800 Subject: [PATCH 2/5] Update PR number --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 025c9498ef8..847639e7d55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `"go.opentelemetry.io/otel/exporter/otel/otlptrace/otlptracegrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#2329) - Changed the project minimum supported Go version from 1.15 to 1.16. (#2412) -- The `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetricgrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#TBD) +- The `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetricgrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#2425) ### Removed From 5bf45067de136532c6f90a5ef9f7f1155b70d2d9 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 2 Dec 2021 14:19:53 -0800 Subject: [PATCH 3/5] Update otlpmetrichttp to use retry package --- .../otlp/otlpmetric/otlpmetrichttp/client.go | 277 ++++++++++-------- .../otlpmetric/otlpmetrichttp/client_test.go | 161 ---------- .../otlpmetrichttp/client_unit_test.go | 68 +++++ .../otlp/otlpmetric/otlpmetrichttp/go.mod | 1 - .../otlp/otlpmetric/otlpmetrichttp/options.go | 83 +++++- 5 files changed, 297 insertions(+), 293 deletions(-) create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/client_unit_test.go diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index e4e5c67e6f9..b6b135c142a 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -21,24 +21,32 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "net" "net/http" "path" + "strconv" "strings" + "sync" "time" "google.golang.org/protobuf/proto" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) const contentTypeProto = "application/x-protobuf" +var gzPool = sync.Pool{ + New: func() interface{} { + w := gzip.NewWriter(ioutil.Discard) + return w + }, +} + // Keep it in sync with golang's DefaultTransport from net/http! We // have our own copy to avoid handling a situation where the // DefaultTransport is overwritten with some different implementation @@ -57,11 +65,13 @@ var ourTransport = &http.Transport{ } type client struct { - name string - cfg otlpconfig.SignalConfig - generalCfg otlpconfig.Config - client *http.Client - stopCh chan struct{} + name string + cfg otlpconfig.SignalConfig + generalCfg otlpconfig.Config + requestFunc retry.RequestFunc + client *http.Client + stopCh chan struct{} + stopOnce sync.Once } // NewClient creates a new HTTP metric client. @@ -73,7 +83,7 @@ func NewClient(opts ...Option) otlpmetric.Client { } for pathPtr, defaultPath := range map[*string]string{ - &cfg.Metrics.URLPath: defaultMetricsPath, + &cfg.Metrics.URLPath: otlpconfig.DefaultMetricsPath, } { tmp := strings.TrimSpace(*pathPtr) if tmp == "" { @@ -86,15 +96,6 @@ func NewClient(opts ...Option) otlpmetric.Client { } *pathPtr = tmp } - if cfg.MaxAttempts <= 0 { - cfg.MaxAttempts = defaultMaxAttempts - } - if cfg.MaxAttempts > defaultMaxAttempts { - cfg.MaxAttempts = defaultMaxAttempts - } - if cfg.Backoff <= 0 { - cfg.Backoff = defaultBackoff - } httpClient := &http.Client{ Transport: ourTransport, @@ -108,11 +109,12 @@ func NewClient(opts ...Option) otlpmetric.Client { stopCh := make(chan struct{}) return &client{ - name: "metrics", - cfg: cfg.Metrics, - generalCfg: cfg, - stopCh: stopCh, - client: httpClient, + name: "metrics", + cfg: cfg.Metrics, + generalCfg: cfg, + requestFunc: cfg.RetryConfig.RequestFunc(evaluate), + stopCh: stopCh, + client: httpClient, } } @@ -129,7 +131,9 @@ func (d *client) Start(ctx context.Context) error { // Stop shuts down the client and interrupt any in-flight request. func (d *client) Stop(ctx context.Context) error { - close(d.stopCh) + d.stopOnce.Do(func() { + close(d.stopCh) + }) select { case <-ctx.Done(): return ctx.Err() @@ -147,41 +151,150 @@ func (d *client) UploadMetrics(ctx context.Context, protoMetrics []*metricpb.Res if err != nil { return err } - return d.send(ctx, rawRequest) -} -func (d *client) send(ctx context.Context, rawRequest []byte) error { - address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath) - var cancel context.CancelFunc - ctx, cancel = d.contextWithStop(ctx) + ctx, cancel := d.contextWithStop(ctx) defer cancel() - for i := 0; i < d.generalCfg.MaxAttempts; i++ { - response, err := d.singleSend(ctx, rawRequest, address) + + request, err := d.newRequest(rawRequest) + if err != nil { + return err + } + + return d.requestFunc(ctx, func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + request.reset(ctx) + resp, err := d.client.Do(request.Request) if err != nil { return err } - // We don't care about the body, so try to read it - // into /dev/null and close it immediately. The - // reading part is to facilitate connection reuse. - _, _ = io.Copy(ioutil.Discard, response.Body) - _ = response.Body.Close() - switch response.StatusCode { + + var rErr error + switch resp.StatusCode { case http.StatusOK: - return nil - case http.StatusTooManyRequests: - fallthrough - case http.StatusServiceUnavailable: - select { - case <-time.After(getWaitDuration(d.generalCfg.Backoff, i)): - continue - case <-ctx.Done(): - return ctx.Err() + // Success, do not retry. + case http.StatusTooManyRequests, + http.StatusServiceUnavailable: + // Retry-able failure. + rErr = newResponseError(resp.Header) + + // Going to retry, drain the body to reuse the connection. + if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil { + _ = resp.Body.Close() + return err } default: - return fmt.Errorf("failed to send %s to %s with HTTP status %s", d.name, address, response.Status) + rErr = fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status) + } + + if err := resp.Body.Close(); err != nil { + return err + } + return rErr + }) +} + +func (d *client) newRequest(body []byte) (request, error) { + address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath) + r, err := http.NewRequest(http.MethodPost, address, nil) + if err != nil { + return request{Request: r}, err + } + + for k, v := range d.cfg.Headers { + r.Header.Set(k, v) + } + r.Header.Set("Content-Type", contentTypeProto) + + req := request{Request: r} + switch Compression(d.cfg.Compression) { + case NoCompression: + r.ContentLength = (int64)(len(body)) + req.bodyReader = bodyReader(body) + case GzipCompression: + // Ensure the content length is not used. + r.ContentLength = -1 + r.Header.Set("Content-Encoding", "gzip") + + gz := gzPool.Get().(*gzip.Writer) + defer gzPool.Put(gz) + + var b bytes.Buffer + gz.Reset(&b) + + if _, err := gz.Write(body); err != nil { + return req, err + } + // Close needs to be called to ensure body if fully written. + if err := gz.Close(); err != nil { + return req, err + } + + req.bodyReader = bodyReader(b.Bytes()) + } + + return req, nil +} + +// bodyReader returns a closure returning a new reader for buf. +func bodyReader(buf []byte) func() io.ReadCloser { + return func() io.ReadCloser { + return ioutil.NopCloser(bytes.NewReader(buf)) + } +} + +// request wraps an http.Request with a resettable body reader. +type request struct { + *http.Request + + // bodyReader allows the same body to be used for multiple requests. + bodyReader func() io.ReadCloser +} + +// reset reinitializes the request Body and uses ctx for the request. +func (r *request) reset(ctx context.Context) { + r.Body = r.bodyReader() + r.Request = r.Request.WithContext(ctx) +} + +// retryableError represents a request failure that can be retried. +type retryableError struct { + throttle int64 +} + +// newResponseError returns a retryableError and will extract any explicit +// throttle delay contained in headers. +func newResponseError(header http.Header) error { + var rErr retryableError + if s, ok := header["Retry-After"]; ok { + if t, err := strconv.ParseInt(s[0], 10, 64); err == nil { + rErr.throttle = t } } - return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.MaxAttempts) + return rErr +} + +func (e retryableError) Error() string { + return "retry-able request failure" +} + +// evaluate returns if err is retry-able. If it is and it includes an explicit +// throttling delay, that delay is also returned. +func evaluate(err error) (bool, time.Duration) { + if err == nil { + return false, 0 + } + + rErr, ok := err.(retryableError) + if !ok { + return false, 0 + } + + return true, time.Duration(rErr.throttle) } func (d *client) getScheme() string { @@ -191,26 +304,6 @@ func (d *client) getScheme() string { return "https" } -func getWaitDuration(backoff time.Duration, i int) time.Duration { - // Strategy: after nth failed attempt, attempt resending after - // k * initialBackoff + jitter, where k is a random number in - // range [0, 2^n-1), and jitter is a random percentage of - // initialBackoff from [-5%, 5%). - // - // Based on - // https://en.wikipedia.org/wiki/Exponential_backoff#Example_exponential_backoff_algorithm - // - // Jitter is our addition. - - // There won't be an overflow, since i is capped to - // defaultMaxAttempts (5). - upperK := (int64)(1) << (i + 1) - jitterPercent := (rand.Float64() - 0.5) / 10. - jitter := jitterPercent * (float64)(backoff) - k := rand.Int63n(upperK) - return (time.Duration)(k)*backoff + (time.Duration)(jitter) -} - func (d *client) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { // Unify the parent context Done signal with the client's stop // channel. @@ -226,51 +319,3 @@ func (d *client) contextWithStop(ctx context.Context) (context.Context, context. }(ctx, cancel) return ctx, cancel } - -func (d *client) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) { - request, err := http.NewRequestWithContext(ctx, http.MethodPost, address, nil) - if err != nil { - return nil, err - } - bodyReader, contentLength, headers := d.prepareBody(rawRequest) - // Not closing bodyReader through defer, the HTTP Client's - // Transport will do it for us - request.Body = bodyReader - request.ContentLength = contentLength - for key, values := range headers { - for _, value := range values { - request.Header.Add(key, value) - } - } - return d.client.Do(request) -} - -func (d *client) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) { - var bodyReader io.ReadCloser - headers := http.Header{} - for k, v := range d.cfg.Headers { - headers.Set(k, v) - } - contentLength := (int64)(len(rawRequest)) - headers.Set("Content-Type", contentTypeProto) - requestReader := bytes.NewBuffer(rawRequest) - switch Compression(d.cfg.Compression) { - case NoCompression: - bodyReader = ioutil.NopCloser(requestReader) - case GzipCompression: - preader, pwriter := io.Pipe() - go func() { - defer pwriter.Close() - gzipper := gzip.NewWriter(pwriter) - defer gzipper.Close() - _, err := io.Copy(gzipper, requestReader) - if err != nil { - otel.Handle(fmt.Errorf("otlphttp: failed to gzip request: %v", err)) - } - }() - headers.Set("Content-Encoding", "gzip") - bodyReader = preader - contentLength = -1 - } - return bodyReader, contentLength, headers -} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 4fb116972e9..e15d8b35d6d 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -16,7 +16,6 @@ package otlpmetrichttp_test import ( "context" - "fmt" "net/http" "os" "testing" @@ -144,32 +143,6 @@ func TestExporterShutdown(t *testing.T) { }) } -func TestRetry(t *testing.T) { - statuses := []int{ - http.StatusTooManyRequests, - http.StatusServiceUnavailable, - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - client := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithMaxAttempts(len(statuses)+1), - ) - ctx := context.Background() - exporter, err := otlpmetric.New(ctx, client) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.NoError(t, err) - assert.Len(t, mc.GetMetrics(), 1) -} - func TestTimeout(t *testing.T) { mcCfg := mockCollectorConfig{ InjectDelay: 100 * time.Millisecond, @@ -191,58 +164,6 @@ func TestTimeout(t *testing.T) { assert.Equal(t, true, os.IsTimeout(err)) } -func TestRetryFailed(t *testing.T) { - statuses := []int{ - http.StatusTooManyRequests, - http.StatusServiceUnavailable, - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithMaxAttempts(1), - ) - ctx := context.Background() - exporter, err := otlpmetric.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.Error(t, err) - assert.Empty(t, mc.GetMetrics()) -} - -func TestNoRetry(t *testing.T) { - statuses := []int{ - http.StatusBadRequest, - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithMaxAttempts(len(statuses)+1), - ) - ctx := context.Background() - exporter, err := otlpmetric.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.Error(t, err) - assert.Equal(t, fmt.Sprintf("failed to send metrics to http://%s/v1/metrics with HTTP status 400 Bad Request", mc.endpoint), err.Error()) - assert.Empty(t, mc.GetMetrics()) -} - func TestEmptyData(t *testing.T) { mcCfg := mockCollectorConfig{} mc := runMockCollector(t, mcCfg) @@ -263,88 +184,6 @@ func TestEmptyData(t *testing.T) { assert.NotEmpty(t, mc.GetMetrics()) } -func TestUnreasonableMaxAttempts(t *testing.T) { - // Max attempts is 5, we set collector to fail 7 times and try - // to configure max attempts to be either negative or too - // large. Since we set max attempts to 5 in such cases, - // exporting to the collector should fail. - type testcase struct { - name string - maxAttempts int - } - for _, tc := range []testcase{ - { - name: "negative max attempts", - maxAttempts: -3, - }, - { - name: "too large max attempts", - maxAttempts: 10, - }, - } { - t.Run(tc.name, func(t *testing.T) { - statuses := make([]int, 0, 7) - for i := 0; i < cap(statuses); i++ { - statuses = append(statuses, http.StatusTooManyRequests) - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithMaxAttempts(tc.maxAttempts), - otlpmetrichttp.WithBackoff(time.Millisecond), - ) - ctx := context.Background() - exporter, err := otlpmetric.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(ctx)) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.Error(t, err) - assert.Empty(t, mc.GetMetrics()) - }) - } -} - -func TestUnreasonableBackoff(t *testing.T) { - // This sets backoff to negative value, which gets corrected - // to default backoff instead of being used. Default max - // attempts is 5, so we set the collector to fail 4 times, but - // we set the deadline to 3 times of the default backoff, so - // this should show that deadline is not met, meaning that the - // retries weren't immediate (as negative backoff could - // imply). - statuses := make([]int, 0, 4) - for i := 0; i < cap(statuses); i++ { - statuses = append(statuses, http.StatusTooManyRequests) - } - mcCfg := mockCollectorConfig{ - InjectHTTPStatus: statuses, - } - mc := runMockCollector(t, mcCfg) - defer mc.MustStop(t) - driver := otlpmetrichttp.NewClient( - otlpmetrichttp.WithEndpoint(mc.Endpoint()), - otlpmetrichttp.WithInsecure(), - otlpmetrichttp.WithBackoff(-time.Millisecond), - ) - ctx, cancel := context.WithTimeout(context.Background(), 3*(300*time.Millisecond)) - defer cancel() - exporter, err := otlpmetric.New(ctx, driver) - require.NoError(t, err) - defer func() { - assert.NoError(t, exporter.Shutdown(context.Background())) - }() - err = exporter.Export(ctx, testResource, oneRecord) - assert.Error(t, err) - assert.Empty(t, mc.GetMetrics()) -} - func TestCancelledContext(t *testing.T) { statuses := []int{ http.StatusBadRequest, diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_unit_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_unit_test.go new file mode 100644 index 00000000000..4ba01c85e5e --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_unit_test.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpmetrichttp + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUnreasonableBackoff(t *testing.T) { + cIface := NewClient( + WithEndpoint("http://localhost"), + WithInsecure(), + WithBackoff(-time.Microsecond), + ) + require.IsType(t, &client{}, cIface) + c := cIface.(*client) + assert.True(t, c.generalCfg.RetryConfig.Enabled) + assert.Equal(t, 5*time.Second, c.generalCfg.RetryConfig.InitialInterval) + assert.Equal(t, 300*time.Millisecond, c.generalCfg.RetryConfig.MaxInterval) + assert.Equal(t, time.Minute, c.generalCfg.RetryConfig.MaxElapsedTime) +} + +func TestUnreasonableMaxAttempts(t *testing.T) { + type testcase struct { + name string + maxAttempts int + } + for _, tc := range []testcase{ + { + name: "negative max attempts", + maxAttempts: -3, + }, + { + name: "too large max attempts", + maxAttempts: 10, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cIface := NewClient( + WithEndpoint("http://localhost"), + WithInsecure(), + WithMaxAttempts(tc.maxAttempts), + ) + require.IsType(t, &client{}, cIface) + c := cIface.(*client) + assert.True(t, c.generalCfg.RetryConfig.Enabled) + assert.Equal(t, 5*time.Second, c.generalCfg.RetryConfig.InitialInterval) + assert.Equal(t, 30*time.Second, c.generalCfg.RetryConfig.MaxInterval) + assert.Equal(t, 145*time.Second, c.generalCfg.RetryConfig.MaxElapsedTime) + }) + } +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod index 14740d119fe..14a3a27fa18 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod @@ -4,7 +4,6 @@ go 1.16 require ( github.com/stretchr/testify v1.7.0 - go.opentelemetry.io/otel v1.2.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.25.0 go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/proto/otlp v0.11.0 diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/options.go b/exporters/otlp/otlpmetric/otlpmetrichttp/options.go index ccb59415a3c..95ac42fd7f0 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/options.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/options.go @@ -19,19 +19,7 @@ import ( "time" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig" -) - -const ( - // defaultMaxAttempts describes how many times the driver - // should retry the sending of the payload in case of a - // retryable error. - defaultMaxAttempts int = 5 - // defaultMetricsPath is a default URL path for endpoint that - // receives metrics. - defaultMetricsPath string = "/v1/metrics" - // defaultBackoff is a default base backoff time used in the - // exponential backoff strategy. - defaultBackoff time.Duration = 300 * time.Millisecond + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/retry" ) // Compression describes the compression used for payloads sent to the @@ -52,6 +40,10 @@ type Option interface { applyHTTPOption(*otlpconfig.Config) } +// RetryConfig defines configuration for retrying batches in case of export +// failure using an exponential backoff. +type RetryConfig retry.Config + type wrappedOption struct { otlpconfig.HTTPOption } @@ -84,15 +76,67 @@ func WithURLPath(urlPath string) Option { // will try to send the payload in case of retryable errors. // The max attempts is limited to at most 5 retries. If unset, // default (5) will be used. +// +// Deprecated: Use WithRetry instead. func WithMaxAttempts(maxAttempts int) Option { - return wrappedOption{otlpconfig.WithMaxAttempts(maxAttempts)} + if maxAttempts > 5 || maxAttempts < 0 { + maxAttempts = 5 + } + return wrappedOption{ + otlpconfig.NewHTTPOption(func(cfg *otlpconfig.Config) { + cfg.RetryConfig.Enabled = true + + var ( + init = cfg.RetryConfig.InitialInterval + maxI = cfg.RetryConfig.MaxInterval + maxE = cfg.RetryConfig.MaxElapsedTime + ) + + if init == 0 { + init = retry.DefaultConfig.InitialInterval + } + if maxI == 0 { + maxI = retry.DefaultConfig.MaxInterval + } + if maxE == 0 { + maxE = retry.DefaultConfig.MaxElapsedTime + } + attempts := int64(maxE+init) / int64(maxI) + + if int64(maxAttempts) == attempts { + return + } + + maxE = time.Duration(int64(maxAttempts)*int64(maxI)) - init + + cfg.RetryConfig.InitialInterval = init + cfg.RetryConfig.MaxInterval = maxI + cfg.RetryConfig.MaxElapsedTime = maxE + }), + } } // WithBackoff tells the driver to use the duration as a base of the // exponential backoff strategy. If unset, default (300ms) will be // used. +// +// Deprecated: Use WithRetry instead. func WithBackoff(duration time.Duration) Option { - return wrappedOption{otlpconfig.WithBackoff(duration)} + if duration < 0 { + duration = 300 * time.Millisecond + } + return wrappedOption{ + otlpconfig.NewHTTPOption(func(cfg *otlpconfig.Config) { + cfg.RetryConfig.Enabled = true + cfg.RetryConfig.MaxInterval = duration + if cfg.RetryConfig.InitialInterval == 0 { + cfg.RetryConfig.InitialInterval = retry.DefaultConfig.InitialInterval + } + if cfg.RetryConfig.MaxElapsedTime == 0 { + cfg.RetryConfig.MaxElapsedTime = retry.DefaultConfig.MaxElapsedTime + } + }), + } } // WithTLSClientConfig can be used to set up a custom TLS @@ -120,3 +164,12 @@ func WithHeaders(headers map[string]string) Option { func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } + +// WithRetry configures the retry policy for transient errors that may occurs +// when exporting traces. An exponential back-off algorithm is used to ensure +// endpoints are not overwhelmed with retries. If unset, the default retry +// policy will retry after 5 seconds and increase exponentially after each +// error for a total of 1 minute. +func WithRetry(rc RetryConfig) Option { + return wrappedOption{otlpconfig.WithRetry(retry.Config(rc))} +} From dea4336d21a83ad10b612b59cb56c58c4ea9aaf8 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 3 Dec 2021 07:53:38 -0800 Subject: [PATCH 4/5] Remove old commented code --- .../otlp/otlpmetric/otlpmetricgrpc/client.go | 54 ------------------- 1 file changed, 54 deletions(-) diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index 9ef2c9eb520..9192930e22a 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -273,57 +273,3 @@ func throttleDelay(status *status.Status) time.Duration { } return 0 } - -/* -func (c *client) handleNewConnection(cc *grpc.ClientConn) { - c.lock.Lock() - defer c.lock.Unlock() - if cc != nil { - c.metricsClient = colmetricpb.NewMetricsServiceClient(cc) - } else { - c.metricsClient = nil - } -} - -// Start establishes a connection to the collector. -func (c *client) Start(ctx context.Context) error { - return c.connection.StartConnection(ctx) -} - -// Stop shuts down the connection to the collector. -func (c *client) Stop(ctx context.Context) error { - return c.connection.Shutdown(ctx) -} - -// UploadMetrics sends a batch of metrics to the collector. -func (c *client) UploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error { - if !c.connection.Connected() { - return fmt.Errorf("metrics exporter is disconnected from the server %s: %w", c.connection.SCfg.Endpoint, c.connection.LastConnectError()) - } - - ctx, cancel := c.connection.ContextWithStop(ctx) - defer cancel() - ctx, tCancel := context.WithTimeout(ctx, c.connection.SCfg.Timeout) - defer tCancel() - - ctx = c.connection.ContextWithMetadata(ctx) - err := func() error { - c.lock.Lock() - defer c.lock.Unlock() - if c.metricsClient == nil { - return errNoClient - } - - return c.connection.DoRequest(ctx, func(ctx context.Context) error { - _, err := c.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{ - ResourceMetrics: protoMetrics, - }) - return err - }) - }() - if err != nil { - c.connection.SetStateDisconnected(err) - } - return err -} -*/ From 40f092ece4956221e36c2186a0d66fafe8f85842 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 3 Dec 2021 08:16:35 -0800 Subject: [PATCH 5/5] Add all other external changes to changelog --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 847639e7d55..724601c4e43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,11 +8,21 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Add the `WithRetry` `Option` and the `RetryConfig` type to the `go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetrichttp` package to specify retry behavior consistently. (#2425) + ### Changed - The `"go.opentelemetry.io/otel/exporter/otel/otlptrace/otlptracegrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#2329) - Changed the project minimum supported Go version from 1.15 to 1.16. (#2412) - The `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetricgrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#2425) +- The `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetricgrpc".RetrySettings` type is renamed to `RetryConfig`. (#2425) + +### Deprecated + +- Deprecated the `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetrichttp".WithMaxAttempts` `Option`, use the new `WithRetry` `Option` instead. (#2425) +- Deprecated the `"go.opentelemetry.io/otel/exporter/otel/otlpmetric/otlpmetrichttp".WithBackoff` `Option`, use the new `WithRetry` `Option` instead. (#2425) ### Removed