diff --git a/CHANGELOG.md b/CHANGELOG.md index 8133966888c..2d083f219ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed +- The `"go.opentelemetry.io/otel/exporter/otel/otlptrace/otlptracegrpc".Client` now uses the underlying gRPC `ClientConn` to handle name resolution, TCP connection establishment (with retries and backoff) and TLS handshakes, and handling errors on established connections by re-resolving the name and reconnecting. (#2329) - Changed the project minimum supported Go version from 1.15 to 1.16. (#2412) ### Removed diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index 533c9d991c8..d9246004975 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -53,13 +53,11 @@ func initProvider() func() { // `localhost:30080` endpoint. Otherwise, replace `localhost` with the // endpoint of your cluster. If you run the app inside k8s, then you can // probably connect directly to the service through dns + conn, err := grpc.DialContext(ctx, "localhost:30080", grpc.WithInsecure(), grpc.WithBlock()) + handleErr(err, "failed to create gRPC connection to collector") // Set up a trace exporter - traceExporter, err := otlptracegrpc.New(ctx, - otlptracegrpc.WithInsecure(), - otlptracegrpc.WithEndpoint("localhost:30080"), - otlptracegrpc.WithDialOption(grpc.WithBlock()), - ) + traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) handleErr(err, "failed to create trace exporter") // Register the trace exporter with a TracerProvider, using a batch diff --git a/exporters/otlp/otlptrace/go.mod b/exporters/otlp/otlptrace/go.mod index 77e52b7315a..d24a4ecea4b 100644 --- a/exporters/otlp/otlptrace/go.mod +++ b/exporters/otlp/otlptrace/go.mod @@ -10,7 +10,6 @@ require ( go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/otel/trace v1.2.0 go.opentelemetry.io/proto/otlp v0.11.0 - google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.42.0 google.golang.org/protobuf v1.27.1 ) diff --git a/exporters/otlp/otlptrace/internal/connection/alignment_test.go b/exporters/otlp/otlptrace/internal/connection/alignment_test.go deleted file mode 100644 index aad85902c28..00000000000 --- a/exporters/otlp/otlptrace/internal/connection/alignment_test.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection - -import ( - "os" - "testing" - "unsafe" - - ottest "go.opentelemetry.io/otel/internal/internaltest" -) - -// Ensure struct alignment prior to running tests. -func TestMain(m *testing.M) { - fields := []ottest.FieldOffset{ - { - Name: "Connection.lastConnectErrPtr", - Offset: unsafe.Offsetof(Connection{}.lastConnectErrPtr), - }, - } - if !ottest.Aligned8Byte(fields, os.Stderr) { - os.Exit(1) - } - - os.Exit(m.Run()) -} diff --git a/exporters/otlp/otlptrace/internal/connection/connection.go b/exporters/otlp/otlptrace/internal/connection/connection.go deleted file mode 100644 index f2073cb1b3a..00000000000 --- a/exporters/otlp/otlptrace/internal/connection/connection.go +++ /dev/null @@ -1,337 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connection // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/connection" - -import ( - "context" - "math/rand" - "sync" - "sync/atomic" - "time" - "unsafe" - - "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" -) - -type Connection struct { - // Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines. - lastConnectErrPtr unsafe.Pointer - - // mu protects the Connection as it is accessed by the - // exporter goroutines and background Connection goroutine - mu sync.Mutex - cc *grpc.ClientConn - - // these fields are read-only after constructor is finished - cfg otlpconfig.Config - SCfg otlpconfig.SignalConfig - requestFunc retry.RequestFunc - metadata metadata.MD - newConnectionHandler func(cc *grpc.ClientConn) - - // these channels are created once - disconnectedCh chan bool - backgroundConnectionDoneCh chan struct{} - stopCh chan struct{} - stopOnce sync.Once - - // this is for tests, so they can replace the closing - // routine without a worry of modifying some global variable - // or changing it back to original after the test is done - closeBackgroundConnectionDoneCh func(ch chan struct{}) -} - -func NewConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler func(cc *grpc.ClientConn)) *Connection { - c := new(Connection) - c.newConnectionHandler = handler - c.cfg = cfg - c.requestFunc = cfg.RetryConfig.RequestFunc(evaluate) - c.SCfg = sCfg - if len(c.SCfg.Headers) > 0 { - c.metadata = metadata.New(c.SCfg.Headers) - } - c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { - close(ch) - } - return c -} - -func (c *Connection) StartConnection(ctx context.Context) error { - c.stopCh = make(chan struct{}) - c.disconnectedCh = make(chan bool, 1) - c.backgroundConnectionDoneCh = make(chan struct{}) - - if err := c.connect(ctx); err == nil { - c.setStateConnected() - } else { - c.SetStateDisconnected(err) - } - go c.indefiniteBackgroundConnection() - - // TODO: proper error handling when initializing connections. - // We can report permanent errors, e.g., invalid settings. - return nil -} - -func (c *Connection) LastConnectError() error { - errPtr := (*error)(atomic.LoadPointer(&c.lastConnectErrPtr)) - if errPtr == nil { - return nil - } - return *errPtr -} - -func (c *Connection) saveLastConnectError(err error) { - var errPtr *error - if err != nil { - errPtr = &err - } - atomic.StorePointer(&c.lastConnectErrPtr, unsafe.Pointer(errPtr)) -} - -func (c *Connection) SetStateDisconnected(err error) { - c.saveLastConnectError(err) - select { - case c.disconnectedCh <- true: - default: - } - c.newConnectionHandler(nil) -} - -func (c *Connection) setStateConnected() { - c.saveLastConnectError(nil) -} - -func (c *Connection) Connected() bool { - return c.LastConnectError() == nil -} - -const defaultConnReattemptPeriod = 10 * time.Second - -func (c *Connection) indefiniteBackgroundConnection() { - defer func() { - c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh) - }() - - connReattemptPeriod := c.cfg.ReconnectionPeriod - if connReattemptPeriod <= 0 { - connReattemptPeriod = defaultConnReattemptPeriod - } - - // No strong seeding required, nano time can - // already help with pseudo uniqueness. - rng := rand.New(rand.NewSource(time.Now().UnixNano() + rand.Int63n(1024))) - - // maxJitterNanos: 70% of the connectionReattemptPeriod - maxJitterNanos := int64(0.7 * float64(connReattemptPeriod)) - - for { - // Otherwise these will be the normal scenarios to enable - // reconnection if we trip out. - // 1. If we've stopped, return entirely - // 2. Otherwise block until we are disconnected, and - // then retry connecting - select { - case <-c.stopCh: - return - - case <-c.disconnectedCh: - // Quickly check if we haven't stopped at the - // same time. - select { - case <-c.stopCh: - return - - default: - } - - // Normal scenario that we'll wait for - } - - if err := c.connect(context.Background()); err == nil { - c.setStateConnected() - } else { - // this code is unreachable in most cases - // c.connect does not establish Connection - c.SetStateDisconnected(err) - } - - // Apply some jitter to avoid lockstep retrials of other - // collector-exporters. Lockstep retrials could result in an - // innocent DDOS, by clogging the machine's resources and network. - jitter := time.Duration(rng.Int63n(maxJitterNanos)) - select { - case <-c.stopCh: - return - case <-time.After(connReattemptPeriod + jitter): - } - } -} - -func (c *Connection) connect(ctx context.Context) error { - cc, err := c.dialToCollector(ctx) - if err != nil { - return err - } - c.setConnection(cc) - c.newConnectionHandler(cc) - return nil -} - -// setConnection sets cc as the client Connection and returns true if -// the Connection state changed. -func (c *Connection) setConnection(cc *grpc.ClientConn) bool { - c.mu.Lock() - defer c.mu.Unlock() - - // If previous clientConn is same as the current then just return. - // This doesn't happen right now as this func is only called with new ClientConn. - // It is more about future-proofing. - if c.cc == cc { - return false - } - - // If the previous clientConn was non-nil, close it - if c.cc != nil { - _ = c.cc.Close() - } - c.cc = cc - return true -} - -func (c *Connection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { - if c.cfg.GRPCConn != nil { - return c.cfg.GRPCConn, nil - } - - dialOpts := []grpc.DialOption{} - if c.cfg.ServiceConfig != "" { - dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.ServiceConfig)) - } - if c.SCfg.GRPCCredentials != nil { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.SCfg.GRPCCredentials)) - } else if c.SCfg.Insecure { - dialOpts = append(dialOpts, grpc.WithInsecure()) - } - if c.SCfg.Compression == otlpconfig.GzipCompression { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) - } - if len(c.cfg.DialOptions) != 0 { - dialOpts = append(dialOpts, c.cfg.DialOptions...) - } - - ctx, cancel := c.ContextWithStop(ctx) - defer cancel() - ctx = c.ContextWithMetadata(ctx) - return grpc.DialContext(ctx, c.SCfg.Endpoint, dialOpts...) -} - -func (c *Connection) ContextWithMetadata(ctx context.Context) context.Context { - if c.metadata.Len() > 0 { - return metadata.NewOutgoingContext(ctx, c.metadata) - } - return ctx -} - -func (c *Connection) Shutdown(ctx context.Context) error { - c.stopOnce.Do(func() { - close(c.stopCh) - }) - // Ensure that the backgroundConnector returns - select { - case <-c.backgroundConnectionDoneCh: - case <-ctx.Done(): - return ctx.Err() - } - - c.mu.Lock() - cc := c.cc - c.cc = nil - c.mu.Unlock() - - if cc != nil { - return cc.Close() - } - - return nil -} - -func (c *Connection) ContextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { - // Unify the parent context Done signal with the Connection's - // stop channel. - ctx, cancel := context.WithCancel(ctx) - go func(ctx context.Context, cancel context.CancelFunc) { - select { - case <-ctx.Done(): - // Nothing to do, either cancelled or deadline - // happened. - case <-c.stopCh: - cancel() - } - }(ctx, cancel) - return ctx, cancel -} - -func (c *Connection) DoRequest(ctx context.Context, fn func(context.Context) error) error { - ctx, cancel := c.ContextWithStop(ctx) - defer cancel() - return c.requestFunc(ctx, func(ctx context.Context) error { - err := fn(ctx) - // nil is converted to OK. - if status.Code(err) == codes.OK { - // Success. - return nil - } - return err - }) -} - -// evaluate returns if err is retry-able and a duration to wait for if an -// explicit throttle time is included in err. -func evaluate(err error) (bool, time.Duration) { - s := status.Convert(err) - switch s.Code() { - case codes.Canceled, - codes.DeadlineExceeded, - codes.ResourceExhausted, - codes.Aborted, - codes.OutOfRange, - codes.Unavailable, - codes.DataLoss: - return true, throttleDelay(s) - } - - // Not a retry-able error. - return false, 0 -} - -// throttleDelay returns a duration to wait for if an explicit throttle time -// is included in the response status. -func throttleDelay(status *status.Status) time.Duration { - for _, detail := range status.Details() { - if t, ok := detail.(*errdetails.RetryInfo); ok { - return t.RetryDelay.AsDuration() - } - } - return 0 -} diff --git a/exporters/otlp/otlptrace/internal/otlpconfig/options.go b/exporters/otlp/otlptrace/internal/otlpconfig/options.go index 8e21250c674..0e2c330a0a5 100644 --- a/exporters/otlp/otlptrace/internal/otlpconfig/options.go +++ b/exporters/otlp/otlptrace/internal/otlpconfig/options.go @@ -20,7 +20,9 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/encoding/gzip" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" ) @@ -76,6 +78,40 @@ func NewDefaultConfig() Config { return c } +// NewGRPCConfig returns a new Config with all settings applied from opts and +// any unset setting using the default gRPC config values. +func NewGRPCConfig(opts ...GRPCOption) Config { + cfg := NewDefaultConfig() + ApplyGRPCEnvConfigs(&cfg) + for _, opt := range opts { + opt.ApplyGRPCOption(&cfg) + } + + if cfg.ServiceConfig != "" { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultServiceConfig(cfg.ServiceConfig)) + } + if cfg.Traces.GRPCCredentials != nil { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(cfg.Traces.GRPCCredentials)) + } else if cfg.Traces.Insecure { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithInsecure()) + } + if cfg.Traces.Compression == GzipCompression { + cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name))) + } + if len(cfg.DialOptions) != 0 { + cfg.DialOptions = append(cfg.DialOptions, cfg.DialOptions...) + } + if cfg.ReconnectionPeriod != 0 { + p := grpc.ConnectParams{ + Backoff: backoff.DefaultConfig, + MinConnectTimeout: cfg.ReconnectionPeriod, + } + cfg.DialOptions = append(cfg.DialOptions, grpc.WithConnectParams(p)) + } + + return cfg +} + type ( // GenericOption applies an option to the HTTP or gRPC driver. GenericOption interface { diff --git a/exporters/otlp/otlptrace/internal/otlptracetest/client.go b/exporters/otlp/otlptrace/internal/otlptracetest/client.go index 0a14f16bfad..aedb8f4a9d2 100644 --- a/exporters/otlp/otlptrace/internal/otlptracetest/client.go +++ b/exporters/otlp/otlptrace/internal/otlptracetest/client.go @@ -56,11 +56,12 @@ func initializeExporter(t *testing.T, client otlptrace.Client) *otlptrace.Export func testClientStopHonorsTimeout(t *testing.T, client otlptrace.Client) { t.Cleanup(func() { - // The test is looking for a failed shut down. Make sure the client is - // actually closed at the end thought to clean up any used resources. - if err := client.Stop(context.Background()); err != nil { - t.Fatalf("failed to stop client: %v", err) - } + // The test is looking for a failed shut down. Call Stop a second time + // with an un-expired context to give the client a second chance at + // cleaning up. There is not guarantee from the Client interface this + // will succeed, therefore, no need to check the error (just give it a + // best try). + _ = client.Stop(context.Background()) }) e := initializeExporter(t, client) @@ -75,11 +76,12 @@ func testClientStopHonorsTimeout(t *testing.T, client otlptrace.Client) { func testClientStopHonorsCancel(t *testing.T, client otlptrace.Client) { t.Cleanup(func() { - // The test is looking for a failed shut down. Make sure the client is - // actually closed at the end thought to clean up any used resources. - if err := client.Stop(context.Background()); err != nil { - t.Fatalf("failed to stop client: %v", err) - } + // The test is looking for a failed shut down. Call Stop a second time + // with an un-expired context to give the client a second chance at + // cleaning up. There is not guarantee from the Client interface this + // will succeed, therefore, no need to check the error (just give it a + // best try). + _ = client.Stop(context.Background()) }) e := initializeExporter(t, client) diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client.go b/exporters/otlp/otlptrace/otlptracegrpc/client.go index 89a3bbea8d8..03bb359f122 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client.go @@ -17,92 +17,259 @@ package otlptracegrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlptra import ( "context" "errors" - "fmt" "sync" + "time" + "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/connection" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" ) type client struct { - connection *connection.Connection + endpoint string + dialOpts []grpc.DialOption + metadata metadata.MD + exportTimeout time.Duration + requestFunc retry.RequestFunc - lock sync.Mutex - tracesClient coltracepb.TraceServiceClient + // stopCtx is used as a parent context for all exports. Therefore, when it + // is canceled with the stopFunc all exports are canceled. + stopCtx context.Context + // stopFunc cancels stopCtx, stopping any active exports. + stopFunc context.CancelFunc + + // ourConn keeps track of where conn was created: true if created here on + // Start, or false if passed with an option. This is important on Shutdown + // as the conn should only be closed if created here on start. Otherwise, + // it is up to the processes that passed the conn to close it. + ourConn bool + conn *grpc.ClientConn + tscMu sync.RWMutex + tsc coltracepb.TraceServiceClient } +// Compile time check *client implements otlptrace.Client. var _ otlptrace.Client = (*client)(nil) -var ( - errNoClient = errors.New("no client") -) - // NewClient creates a new gRPC trace client. func NewClient(opts ...Option) otlptrace.Client { - cfg := otlpconfig.NewDefaultConfig() - otlpconfig.ApplyGRPCEnvConfigs(&cfg) - for _, opt := range opts { - opt.applyGRPCOption(&cfg) + return newClient(opts...) +} + +func newClient(opts ...Option) *client { + cfg := otlpconfig.NewGRPCConfig(asGRPCOptions(opts)...) + + ctx, cancel := context.WithCancel(context.Background()) + + c := &client{ + endpoint: cfg.Traces.Endpoint, + exportTimeout: cfg.Traces.Timeout, + requestFunc: cfg.RetryConfig.RequestFunc(retryable), + dialOpts: cfg.DialOptions, + stopCtx: ctx, + stopFunc: cancel, + conn: cfg.GRPCConn, } - c := &client{} - c.connection = connection.NewConnection(cfg, cfg.Traces, c.handleNewConnection) + if len(cfg.Traces.Headers) > 0 { + c.metadata = metadata.New(cfg.Traces.Headers) + } return c } -func (c *client) handleNewConnection(cc *grpc.ClientConn) { - c.lock.Lock() - defer c.lock.Unlock() - if cc != nil { - c.tracesClient = coltracepb.NewTraceServiceClient(cc) - } else { - c.tracesClient = nil +// Start establishes a gRPC connection to the collector. +func (c *client) Start(ctx context.Context) error { + if c.conn == nil { + // If the caller did not provide a ClientConn when the client was + // created, create one using the configuration they did provide. + conn, err := grpc.DialContext(ctx, c.endpoint, c.dialOpts...) + if err != nil { + return err + } + // Keep track that we own the lifecycle of this conn and need to close + // it on Shutdown. + c.ourConn = true + c.conn = conn } -} -// Start establishes a connection to the collector. -func (c *client) Start(ctx context.Context) error { - return c.connection.StartConnection(ctx) + // The otlptrace.Client interface states this method is called just once, + // so no need to check if already started. + c.tscMu.Lock() + c.tsc = coltracepb.NewTraceServiceClient(c.conn) + c.tscMu.Unlock() + + return nil } -// Stop shuts down the connection to the collector. +var errAlreadyStopped = errors.New("the client is already stopped") + +// Stop shuts down the client. +// +// Any active connections to a remote endpoint are closed if they were created +// by the client. Any gRPC connection passed during creation using +// WithGRPCConn will not be closed. It is the caller's responsibility to +// handle cleanup of that resource. +// +// This method synchronizes with the UploadTraces method of the client. It +// will wait for any active calls to that method to complete unimpeded, or it +// will cancel any active calls if ctx expires. If ctx expires, the context +// error will be forwarded as the returned error. All client held resources +// will still be released in this situation. +// +// If the client has already stopped, an error will be returned describing +// this. func (c *client) Stop(ctx context.Context) error { - return c.connection.Shutdown(ctx) + // Acquire the c.tscMu lock within the ctx lifetime. + acquired := make(chan struct{}) + go func() { + c.tscMu.Lock() + close(acquired) + }() + var err error + select { + case <-ctx.Done(): + // The Stop timeout is reached. Kill any remaining exports to force + // the clear of the lock and save the timeout error to return and + // signal the shutdown timed out before cleanly stopping. + c.stopFunc() + err = ctx.Err() + + // To ensure the client is not left in a dirty state c.tsc needs to be + // set to nil. To avoid the race condition when doing this, ensure + // that all the exports are killed (initiated by c.stopFunc). + <-acquired + case <-acquired: + } + // Hold the tscMu lock for the rest of the function to ensure no new + // exports are started. + defer c.tscMu.Unlock() + + // The otlptrace.Client interface states this method is called only + // once, but there is no guarantee it is called after Start. Ensure the + // client is started before doing anything and let the called know if they + // made a mistake. + if c.tsc == nil { + return errAlreadyStopped + } + + // Clear c.tsc to signal the client is stopped. + c.tsc = nil + + if c.ourConn { + closeErr := c.conn.Close() + // A context timeout error takes precedence over this error. + if err == nil && closeErr != nil { + err = closeErr + } + } + return err } -// UploadTraces sends a batch of spans to the collector. +var errShutdown = errors.New("the client is shutdown") + +// UploadTraces sends a batch of spans. +// +// Retryable errors from the server will be handled according to any +// RetryConfig the client was created with. func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { - if !c.connection.Connected() { - return fmt.Errorf("traces exporter is disconnected from the server %s: %w", c.connection.SCfg.Endpoint, c.connection.LastConnectError()) + // Hold a read lock to ensure a shut down initiated after this starts does + // not abandon the export. This read lock acquire has less priority than a + // write lock acquire (i.e. Stop), meaning if the client is shutting down + // this will come after the shut down. + c.tscMu.RLock() + defer c.tscMu.RUnlock() + + if c.tsc == nil { + return errShutdown } - ctx, cancel := c.connection.ContextWithStop(ctx) + ctx, cancel := c.exportContext(ctx) defer cancel() - ctx, tCancel := context.WithTimeout(ctx, c.connection.SCfg.Timeout) - defer tCancel() - - ctx = c.connection.ContextWithMetadata(ctx) - err := func() error { - c.lock.Lock() - defer c.lock.Unlock() - if c.tracesClient == nil { - return errNoClient - } - return c.connection.DoRequest(ctx, func(ctx context.Context) error { - _, err := c.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ - ResourceSpans: protoSpans, - }) - return err + + return c.requestFunc(ctx, func(iCtx context.Context) error { + _, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: protoSpans, }) + // nil is converted to OK. + if status.Code(err) == codes.OK { + // Success. + return nil + } + return err + }) +} + +// exportContext returns a copy of parent with an appropriate deadline and +// cancellation function. +// +// It is the callers responsibility to cancel the returned context once its +// use is complete, via the parent or directly with the returned CancelFunc, to +// ensure all resources are correctly released. +func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) { + var ( + ctx context.Context + cancel context.CancelFunc + ) + + if c.exportTimeout > 0 { + ctx, cancel = context.WithTimeout(parent, c.exportTimeout) + } else { + ctx, cancel = context.WithCancel(parent) + } + + if c.metadata.Len() > 0 { + ctx = metadata.NewOutgoingContext(ctx, c.metadata) + } + + // Unify the client stopCtx with the parent. + go func() { + select { + case <-ctx.Done(): + case <-c.stopCtx.Done(): + // Cancel the export as the shutdown has timed out. + cancel() + } }() - if err != nil { - c.connection.SetStateDisconnected(err) + + return ctx, cancel +} + +// retryable returns if err identifies a request that can be retried and a +// duration to wait for if an explicit throttle time is included in err. +func retryable(err error) (bool, time.Duration) { + //func retryable(err error) (bool, time.Duration) { + s := status.Convert(err) + switch s.Code() { + case codes.Canceled, + codes.DeadlineExceeded, + codes.ResourceExhausted, + codes.Aborted, + codes.OutOfRange, + codes.Unavailable, + codes.DataLoss: + return true, throttleDelay(s) } - return err + + // Not a retry-able error. + return false, 0 +} + +// throttleDelay returns a duration to wait for if an explicit throttle time +// is included in the response status. +func throttleDelay(status *status.Status) time.Duration { + for _, detail := range status.Details() { + if t, ok := detail.(*errdetails.RetryInfo); ok { + return t.RetryDelay.AsDuration() + } + } + return 0 } diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go index d99afbb3f38..a290255f7eb 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go @@ -167,120 +167,6 @@ func TestNew_invokeStartThenStopManyTimes(t *testing.T) { } } -func TestNew_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) { - // TODO: Fix this test #1527 - t.Skip("This test is flaky and needs to be rewritten") - mc := runMockCollector(t) - - reconnectionPeriod := 20 * time.Millisecond - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}), - otlptracegrpc.WithReconnectionPeriod(reconnectionPeriod)) - t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) - - // Wait for a connection. - mc.ln.WaitForConn() - - // We'll now stop the collector right away to simulate a connection - // dying in the midst of communication or even not existing before. - require.NoError(t, mc.stop()) - - // first export, it will send disconnected message to the channel on export failure, - // trigger almost immediate reconnection - require.Error(t, exp.ExportSpans(ctx, roSpans)) - - // second export, it will detect connection issue, change state of exporter to disconnected and - // send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel) - require.Error(t, exp.ExportSpans(ctx, roSpans)) - - // as a result we have exporter in disconnected state waiting for disconnection message to reconnect - - // resurrect collector - nmc := runMockCollectorAtEndpoint(t, mc.endpoint) - - // make sure reconnection loop hits beginning and goes back to waiting mode - // after hitting beginning of the loop it should reconnect - nmc.ln.WaitForConn() - - n := 10 - for i := 0; i < n; i++ { - // when disconnected exp.ExportSpans doesnt send disconnected messages again - // it just quits and return last connection error - require.NoError(t, exp.ExportSpans(ctx, roSpans)) - } - - nmaSpans := nmc.getSpans() - - // Expecting 10 spans that were sampled, given that - if g, w := len(nmaSpans), n; g != w { - t.Fatalf("Connected collector: spans: got %d want %d", g, w) - } - - dSpans := mc.getSpans() - // Expecting 0 spans to have been received by the original but now dead collector - if g, w := len(dSpans), 0; g != w { - t.Fatalf("Disconnected collector: spans: got %d want %d", g, w) - } - - require.NoError(t, nmc.Stop()) -} - -func TestNew_collectorConnectionDiesThenReconnects(t *testing.T) { - // TODO: Fix this test #1527 - t.Skip("This test is flaky and needs to be rewritten") - mc := runMockCollector(t) - - reconnectionPeriod := 50 * time.Millisecond - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}), - otlptracegrpc.WithReconnectionPeriod(reconnectionPeriod)) - t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) - - mc.ln.WaitForConn() - - // We'll now stop the collector right away to simulate a connection - // dying in the midst of communication or even not existing before. - require.NoError(t, mc.stop()) - - // In the test below, we'll stop the collector many times, - // while exporting traces and test to ensure that we can - // reconnect. - for j := 0; j < 3; j++ { - - // No endpoint up. - require.Error(t, exp.ExportSpans(ctx, roSpans)) - - // Now resurrect the collector by making a new one but reusing the - // old endpoint, and the collector should reconnect automatically. - nmc := runMockCollectorAtEndpoint(t, mc.endpoint) - - // Give the exporter sometime to reconnect - nmc.ln.WaitForConn() - - n := 10 - for i := 0; i < n; i++ { - require.NoError(t, exp.ExportSpans(ctx, roSpans)) - } - - nmaSpans := nmc.getSpans() - // Expecting 10 spans that were sampled, given that - if g, w := len(nmaSpans), n; g != w { - t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w) - } - - dSpans := mc.getSpans() - // Expecting 0 spans to have been received by the original but now dead collector - if g, w := len(dSpans), 0; g != w { - t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w) - } - - // Disconnect for the next try. - require.NoError(t, nmc.stop()) - } -} - // This test takes a long time to run: to skip it, run tests using: -short func TestNew_collectorOnBadConnection(t *testing.T) { if testing.Short() { @@ -352,24 +238,14 @@ func TestExportSpansTimeoutHonored(t *testing.T) { require.Equal(t, codes.DeadlineExceeded, status.Convert(err).Code()) } -func TestNew_withInvalidSecurityConfiguration(t *testing.T) { +func TestStartErrorInvalidSecurityConfiguration(t *testing.T) { mc := runMockCollector(t) t.Cleanup(func() { require.NoError(t, mc.stop()) }) - ctx := context.Background() - driver := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(mc.endpoint)) - exp, err := otlptrace.New(ctx, driver) - if err != nil { - t.Fatalf("failed to create a new collector exporter: %v", err) - } - t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) - - err = exp.ExportSpans(ctx, roSpans) - - expectedErr := fmt.Sprintf("traces exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint) - - require.Error(t, err) - require.Equal(t, expectedErr, err.Error()) + client := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(mc.endpoint)) + err := client.Start(context.Background()) + // https://github.com/grpc/grpc-go/blob/a671967dfbaab779d37fd7e597d9248f13806087/clientconn.go#L82 + assert.EqualError(t, err, "grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") } func TestNew_withMultipleAttributeTypes(t *testing.T) { @@ -492,21 +368,19 @@ func TestNew_withMultipleAttributeTypes(t *testing.T) { } } -func TestDisconnected(t *testing.T) { - ctx := context.Background() - // The endpoint is whatever, we want to be disconnected. But we - // setting a blocking connection, so dialing to the invalid - // endpoint actually fails. - exp := newGRPCExporter(t, ctx, "invalid", - otlptracegrpc.WithReconnectionPeriod(time.Hour), +func TestStartErrorInvalidAddress(t *testing.T) { + client := otlptracegrpc.NewClient( + otlptracegrpc.WithInsecure(), + // Validate the connection in Start (which should return the error). otlptracegrpc.WithDialOption( grpc.WithBlock(), grpc.FailOnNonTempDialError(true), ), + otlptracegrpc.WithEndpoint("invalid"), + otlptracegrpc.WithReconnectionPeriod(time.Hour), ) - t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) - - assert.Error(t, exp.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())) + err := client.Start(context.Background()) + assert.EqualError(t, err, `connection error: desc = "transport: error while dialing: dial tcp: address invalid: missing port in address"`) } func TestEmptyData(t *testing.T) { diff --git a/exporters/otlp/otlptrace/internal/connection/connection_test.go b/exporters/otlp/otlptrace/otlptracegrpc/client_unit_test.go similarity index 59% rename from exporters/otlp/otlptrace/internal/connection/connection_test.go rename to exporters/otlp/otlptrace/otlptracegrpc/client_unit_test.go index 8c0c86ed261..5c43df1e322 100644 --- a/exporters/otlp/otlptrace/internal/connection/connection_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client_unit_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package connection +package otlptracegrpc import ( "context" @@ -25,8 +25,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" - - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry" ) func TestThrottleDuration(t *testing.T) { @@ -98,8 +96,8 @@ func TestThrottleDuration(t *testing.T) { } } -func TestEvaluate(t *testing.T) { - retryable := map[codes.Code]bool{ +func TestRetryable(t *testing.T) { + retryableCodes := map[codes.Code]bool{ codes.OK: false, codes.Canceled: true, codes.Unknown: false, @@ -119,27 +117,77 @@ func TestEvaluate(t *testing.T) { codes.Unauthenticated: false, } - for c, want := range retryable { - got, _ := evaluate(status.Error(c, "")) + for c, want := range retryableCodes { + got, _ := retryable(status.Error(c, "")) assert.Equalf(t, want, got, "evaluate(%s)", c) } } -func TestDoRequest(t *testing.T) { - ev := func(error) (bool, time.Duration) { return false, 0 } - - c := new(Connection) - c.requestFunc = retry.Config{}.RequestFunc(ev) - c.stopCh = make(chan struct{}) - - ctx := context.Background() - assert.NoError(t, c.DoRequest(ctx, func(ctx context.Context) error { - return nil - })) - assert.NoError(t, c.DoRequest(ctx, func(ctx context.Context) error { - return status.Error(codes.OK, "") - })) - assert.ErrorIs(t, c.DoRequest(ctx, func(ctx context.Context) error { - return assert.AnError - }), assert.AnError) +func TestUnstartedStop(t *testing.T) { + client := NewClient() + assert.ErrorIs(t, client.Stop(context.Background()), errAlreadyStopped) +} + +func TestUnstartedUploadTrace(t *testing.T) { + client := NewClient() + assert.ErrorIs(t, client.UploadTraces(context.Background(), nil), errShutdown) +} + +func TestExportContextHonorsParentDeadline(t *testing.T) { + now := time.Now() + ctx, cancel := context.WithDeadline(context.Background(), now) + t.Cleanup(cancel) + + // Without a client timeout, the parent deadline should be used. + client := newClient(WithTimeout(0)) + eCtx, eCancel := client.exportContext(ctx) + t.Cleanup(eCancel) + + deadline, ok := eCtx.Deadline() + assert.True(t, ok, "deadline not propagated to child context") + assert.Equal(t, now, deadline) +} + +func TestExportContextHonorsClientTimeout(t *testing.T) { + // Setting a timeout should ensure a deadline is set on the context. + client := newClient(WithTimeout(1 * time.Second)) + ctx, cancel := client.exportContext(context.Background()) + t.Cleanup(cancel) + + _, ok := ctx.Deadline() + assert.True(t, ok, "timeout not set as deadline for child context") +} + +func TestExportContextLinksStopSignal(t *testing.T) { + rootCtx := context.Background() + + client := newClient(WithInsecure()) + t.Cleanup(func() { require.NoError(t, client.Stop(rootCtx)) }) + require.NoError(t, client.Start(rootCtx)) + + ctx, cancel := client.exportContext(rootCtx) + t.Cleanup(cancel) + + require.False(t, func() bool { + select { + case <-ctx.Done(): + return true + default: + } + return false + }(), "context should not be done prior to canceling it") + + // The client.stopFunc cancels the client.stopCtx. This should have been + // setup as a parent of ctx. Therefore, it should cancel ctx as well. + client.stopFunc() + + // Assert this with Eventually to account for goroutine scheduler timing. + assert.Eventually(t, func() bool { + select { + case <-ctx.Done(): + return true + default: + } + return false + }, 10*time.Second, time.Microsecond) } diff --git a/exporters/otlp/otlptrace/otlptracegrpc/go.mod b/exporters/otlp/otlptrace/otlptracegrpc/go.mod index b68baefc1ed..2959daf4b56 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/go.mod +++ b/exporters/otlp/otlptrace/otlptracegrpc/go.mod @@ -9,7 +9,9 @@ require ( go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/proto/otlp v0.11.0 go.uber.org/goleak v1.1.12 + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 google.golang.org/grpc v1.42.0 + google.golang.org/protobuf v1.27.1 ) replace go.opentelemetry.io/otel => ../../../.. diff --git a/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go b/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go index 1987fd31708..359202aed05 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go @@ -18,8 +18,6 @@ import ( "context" "fmt" "net" - "runtime" - "strings" "sync" "testing" "time" @@ -101,7 +99,6 @@ type mockCollector struct { traceSvc *mockTraceService endpoint string - ln *listener stopFunc func() stopOnce sync.Once } @@ -171,70 +168,12 @@ func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockColle srv := grpc.NewServer() mc := makeMockCollector(t, mockConfig) collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc) - mc.ln = newListener(ln) go func() { - _ = srv.Serve((net.Listener)(mc.ln)) + _ = srv.Serve(ln) }() mc.endpoint = ln.Addr().String() - // srv.Stop calls Close on mc.ln. mc.stopFunc = srv.Stop return mc } - -type listener struct { - closeOnce sync.Once - wrapped net.Listener - C chan struct{} -} - -func newListener(wrapped net.Listener) *listener { - return &listener{ - wrapped: wrapped, - C: make(chan struct{}, 1), - } -} - -func (l *listener) Close() error { return l.wrapped.Close() } - -func (l *listener) Addr() net.Addr { return l.wrapped.Addr() } - -// Accept waits for and returns the next connection to the listener. It will -// send a signal on l.C that a connection has been made before returning. -func (l *listener) Accept() (net.Conn, error) { - conn, err := l.wrapped.Accept() - if err != nil { - // Go 1.16 exported net.ErrClosed that could clean up this check, but to - // remain backwards compatible with previous versions of Go that we - // support the following string evaluation is used instead to keep in line - // with the previously recommended way to check this: - // https://github.com/golang/go/issues/4373#issuecomment-353076799 - if strings.Contains(err.Error(), "use of closed network connection") { - // If the listener has been closed, do not allow callers of - // WaitForConn to wait for a connection that will never come. - l.closeOnce.Do(func() { close(l.C) }) - } - return conn, err - } - - select { - case l.C <- struct{}{}: - default: - // If C is full, assume nobody is listening and move on. - } - return conn, nil -} - -// WaitForConn will wait indefintely for a connection to be estabilished with -// the listener before returning. -func (l *listener) WaitForConn() { - for { - select { - case <-l.C: - return - default: - runtime.Gosched() - } - } -} diff --git a/exporters/otlp/otlptrace/otlptracegrpc/options.go b/exporters/otlp/otlptrace/otlptracegrpc/options.go index da82e622c65..8948c0e9c0b 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/options.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/options.go @@ -31,8 +31,19 @@ type Option interface { applyGRPCOption(*otlpconfig.Config) } -// RetryConfig defines configuration for retrying batches in case of export -// failure using an exponential backoff. +func asGRPCOptions(opts []Option) []otlpconfig.GRPCOption { + converted := make([]otlpconfig.GRPCOption, len(opts)) + for i, o := range opts { + converted[i] = otlpconfig.NewGRPCOption(o.applyGRPCOption) + } + return converted +} + +// RetryConfig defines configuration for retrying export of span batches that +// failed to be received by the target endpoint. +// +// This configuration does not define any network retry strategy. That is +// entirely handled by the gRPC ClientConn. type RetryConfig retry.Config type wrappedOption struct { @@ -43,22 +54,28 @@ func (w wrappedOption) applyGRPCOption(cfg *otlpconfig.Config) { w.ApplyGRPCOption(cfg) } -// WithInsecure disables client transport security for the exporter's gRPC connection -// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure -// does. Note, by default, client security is required unless WithInsecure is used. +// WithInsecure disables client transport security for the exporter's gRPC +// connection just like grpc.WithInsecure() +// (https://pkg.go.dev/google.golang.org/grpc#WithInsecure) does. Note, by +// default, client security is required unless WithInsecure is used. +// +// This option has no effect if WithGRPCConn is used. func WithInsecure() Option { return wrappedOption{otlpconfig.WithInsecure()} } -// WithEndpoint allows one to set the endpoint that the exporter will -// connect to the collector on. If unset, it will instead try to use -// connect to DefaultCollectorHost:DefaultCollectorPort. +// WithEndpoint sets the target endpoint the exporter will connect to. If +// unset, localhost:4317 will be used as a default. +// +// This option has no effect if WithGRPCConn is used. func WithEndpoint(endpoint string) Option { return wrappedOption{otlpconfig.WithEndpoint(endpoint)} } -// WithReconnectionPeriod allows one to set the delay between next connection attempt -// after failing to connect with the collector. +// WithReconnectionPeriod set the minimum amount of time between connection +// attempts to the target endpoint. +// +// This option has no effect if WithGRPCConn is used. func WithReconnectionPeriod(rp time.Duration) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.ReconnectionPeriod = rp @@ -75,25 +92,30 @@ func compressorToCompression(compressor string) otlpconfig.Compression { return otlpconfig.NoCompression } -// WithCompressor will set the compressor for the gRPC client to use when sending requests. -// It is the responsibility of the caller to ensure that the compressor set has been registered -// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some -// compressors auto-register on import, such as gzip, which can be registered by calling +// WithCompressor sets the compressor for the gRPC client to use when sending +// requests. It is the responsibility of the caller to ensure that the +// compressor set has been registered with google.golang.org/grpc/encoding. +// This can be done by encoding.RegisterCompressor. Some compressors +// auto-register on import, such as gzip, which can be registered by calling // `import _ "google.golang.org/grpc/encoding/gzip"`. +// +// This option has no effect if WithGRPCConn is used. func WithCompressor(compressor string) Option { return wrappedOption{otlpconfig.WithCompression(compressorToCompression(compressor))} } -// WithHeaders will send the provided headers with gRPC requests. +// WithHeaders will send the provided headers with each gRPC requests. func WithHeaders(headers map[string]string) Option { return wrappedOption{otlpconfig.WithHeaders(headers)} } -// WithTLSCredentials allows the connection to use TLS credentials -// when talking to the server. It takes in grpc.TransportCredentials instead -// of say a Certificate file or a tls.Certificate, because the retrieving of -// these credentials can be done in many ways e.g. plain file, in code tls.Config -// or by certificate rotation, so it is up to the caller to decide what to use. +// WithTLSCredentials allows the connection to use TLS credentials when +// talking to the server. It takes in grpc.TransportCredentials instead of say +// a Certificate file or a tls.Certificate, because the retrieving of these +// credentials can be done in many ways e.g. plain file, in code tls.Config or +// by certificate rotation, so it is up to the caller to decide what to use. +// +// This option has no effect if WithGRPCConn is used. func WithTLSCredentials(creds credentials.TransportCredentials) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.Traces.GRPCCredentials = creds @@ -101,40 +123,63 @@ func WithTLSCredentials(creds credentials.TransportCredentials) Option { } // WithServiceConfig defines the default gRPC service config used. +// +// This option has no effect if WithGRPCConn is used. func WithServiceConfig(serviceConfig string) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.ServiceConfig = serviceConfig })} } -// WithDialOption opens support to any grpc.DialOption to be used. If it conflicts -// with some other configuration the GRPC specified via the collector the ones here will -// take preference since they are set last. +// WithDialOption sets explicit grpc.DialOptions to use when making a +// connection. The options here are appended to the internal grpc.DialOptions +// used so they will take precedence over any other internal grpc.DialOptions +// they might conflict with. +// +// This option has no effect if WithGRPCConn is used. func WithDialOption(opts ...grpc.DialOption) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.DialOptions = opts })} } -// WithGRPCConn allows reusing existing gRPC connection when it has already been -// established for other services. When set, other dial options will be ignored. +// WithGRPCConn sets conn as the gRPC ClientConn used for all communication. +// +// This option takes precedence over any other option that relates to +// establishing or persisting a gRPC connection to a target endpoint. Any +// other option of those types passed will be ignored. +// +// It is the callers responsibility to close the passed conn. The client +// Shutdown method will not close this connection. func WithGRPCConn(conn *grpc.ClientConn) Option { return wrappedOption{otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) { cfg.GRPCConn = conn })} } -// WithTimeout tells the driver the max waiting time for the backend to process -// each spans batch. If unset, the default will be 10 seconds. +// WithTimeout sets the max amount of time a client will attempt to export a +// batch of spans. This takes precedence over any retry settings defined with +// WithRetry, once this time limit has been reached the export is abandoned +// and the batch of spans is dropped. +// +// If unset, the default timeout will be set to 10 seconds. func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } -// WithRetry configures the retry policy for transient errors that may occurs -// when exporting traces. An exponential back-off algorithm is used to ensure -// endpoints are not overwhelmed with retries. If unset, the default retry -// policy will retry after 5 seconds and increase exponentially after each -// error for a total of 1 minute. +// WithRetry sets the retry policy for transient retryable errors that may be +// returned by the target endpoint when exporting a batch of spans. +// +// If the target endpoint responds with not only a retryable error, but +// explicitly returns a backoff time in the response. That time will take +// precedence over these settings. +// +// These settings do not define any network retry strategy. That is entirely +// handled by the gRPC ClientConn. +// +// If unset, the default retry policy will be used. It will retry the export +// 5 seconds after receiving a retryable error and increase exponentially +// after each error for no more than a total time of 1 minute. func WithRetry(settings RetryConfig) Option { return wrappedOption{otlpconfig.WithRetry(retry.Config(settings))} }