Skip to content

Commit

Permalink
Allow customizing DialOptions for gRPC plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
t0yv0 committed Nov 2, 2022
1 parent 51f8df7 commit 7a22a2d
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 21 deletions.
9 changes: 7 additions & 2 deletions sdk/go/common/resource/plugin/analyzer_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
Expand Down Expand Up @@ -62,8 +63,10 @@ func NewAnalyzer(host Host, ctx *Context, name tokens.QName) (Analyzer, error) {
}
contract.Assert(path != "")

dialOpts := append(ctx.DialOptions, rpcutil.OpenTracingInterceptorDialOptions()...)

plug, err := newPlugin(ctx, ctx.Pwd, path, fmt.Sprintf("%v (analyzer)", name),
[]string{host.ServerAddr(), ctx.Pwd}, nil /*env*/)
[]string{host.ServerAddr(), ctx.Pwd}, nil /*env*/, dialOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -126,7 +129,9 @@ func NewPolicyAnalyzer(
}
}

plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name), args, env)
dialOpts := append(ctx.DialOptions, rpcutil.OpenTracingInterceptorDialOptions()...)

plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name), args, env, dialOpts)
if err != nil {
// The original error might have been wrapped before being returned from newPlugin. So we look for
// the root cause of the error. This won't work if we switch to Go 1.13's new approach to wrapping.
Expand Down
3 changes: 3 additions & 0 deletions sdk/go/common/resource/plugin/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"

"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"

"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag/colors"
Expand All @@ -36,6 +37,8 @@ type Context struct {
Pwd string // the working directory to spawn all plugins in.
Root string // the root directory of the project.

DialOptions []grpc.DialOption // common options for gRPC clients to plugins

tracingSpan opentracing.Span // the OpenTracing span to parent requests within.

cancelFuncs []context.CancelFunc
Expand Down
5 changes: 4 additions & 1 deletion sdk/go/common/resource/plugin/langruntime_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
Expand Down Expand Up @@ -62,7 +63,9 @@ func NewLanguageRuntime(host Host, ctx *Context, runtime string,
return nil, err
}

plug, err := newPlugin(ctx, ctx.Pwd, path, runtime, args, nil /*env*/)
dialOpts := append(ctx.DialOptions, rpcutil.OpenTracingInterceptorDialOptions()...)

plug, err := newPlugin(ctx, ctx.Pwd, path, runtime, args, nil /*env*/, dialOpts)
if err != nil {
return nil, err
}
Expand Down
33 changes: 18 additions & 15 deletions sdk/go/common/resource/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2021, Pulumi Corporation.
// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@ package plugin
import (
"bufio"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
Expand All @@ -27,7 +28,6 @@ import (
"syscall"
"time"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"golang.org/x/net/context"
Expand Down Expand Up @@ -115,15 +115,16 @@ var errRunPolicyModuleNotFound = errors.New("pulumi SDK does not support policy
// errPluginNotFound is returned when we try to execute a plugin but it is not found on disk.
var errPluginNotFound = errors.New("plugin not found")

func dialPlugin(port, bin, prefix string) (*grpc.ClientConn, error) {
// Now that we have the port, go ahead and create a gRPC client connection to it.
conn, err := grpc.Dial(
"127.0.0.1:"+port,
func dialPlugin(portNum int, bin, prefix string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) {
port := fmt.Sprintf("%d", portNum)

dialOptions = append(dialOptions,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()),
rpcutil.GrpcChannelOptions(),
)

// Now that we have the port, go ahead and create a gRPC client connection to it.
conn, err := grpc.Dial("127.0.0.1:"+port, dialOptions...)
if err != nil {
return nil, errors.Wrapf(err, "could not dial plugin [%v] over RPC", bin)
}
Expand Down Expand Up @@ -173,7 +174,7 @@ func dialPlugin(port, bin, prefix string) (*grpc.ClientConn, error) {
return conn, nil
}

func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, options ...otgrpc.Option) (*plugin, error) {
func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, dialOptions []grpc.DialOption) (*plugin, error) {
if logging.V(9) {
var argstr string
for i, arg := range args {
Expand Down Expand Up @@ -245,7 +246,7 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option

// Now that we have a process, we expect it to write a single line to STDOUT: the port it's listening on. We only
// read a byte at a time so that STDOUT contains everything after the first newline.
var port string
var portString string
b := make([]byte, 1)
for {
n, readerr := plug.Stdout.Read(b)
Expand All @@ -259,19 +260,21 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option
}

// Fall back to a generic, opaque error.
if port == "" {
if portString == "" {
return nil, errors.Wrapf(readerr, "could not read plugin [%v] stdout", bin)
}
return nil, errors.Wrapf(readerr, "failure reading plugin [%v] stdout (read '%v')", bin, port)
return nil, errors.Wrapf(readerr, "failure reading plugin [%v] stdout (read '%v')",
bin, portString)
}
if n > 0 && b[0] == '\n' {
break
}
port += string(b[:n])
portString += string(b[:n])
}

// Parse the output line (minus the '\n') to ensure it's a numeric port.
if _, err = strconv.Atoi(port); err != nil {
var port int
if port, err = strconv.Atoi(portString); err != nil {
killerr := plug.Kill()
contract.IgnoreError(killerr) // ignoring the error because the existing one trumps it.
return nil, errors.Wrapf(
Expand All @@ -283,7 +286,7 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option
plug.stdoutDone = stdoutDone
go runtrace(plug.Stdout, false, stdoutDone)

conn, err := dialPlugin(port, bin, prefix)
conn, err := dialPlugin(port, bin, prefix, dialOptions)
if err != nil {
return nil, err
}
Expand Down
24 changes: 21 additions & 3 deletions sdk/go/common/resource/plugin/provider_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"

"github.com/blang/semver"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
Expand Down Expand Up @@ -94,7 +96,15 @@ func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Ve
prefix := fmt.Sprintf("%v (resource)", pkg)

if optAttach != "" {
conn, err := dialPlugin(optAttach, pkg.String(), prefix)
port, err := strconv.Atoi(optAttach)
if err != nil {
return nil, fmt.Errorf("Expected a numeric port, got %s in PULUMI_DEBUG_PROVIDERS: %w",
optAttach, err)
}

dialOpts := append(ctx.DialOptions, rpcutil.OpenTracingInterceptorDialOptions()...)

conn, err := dialPlugin(port, pkg.String(), prefix, dialOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -122,8 +132,11 @@ func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Ve
env = append(env, fmt.Sprintf("PULUMI_RUNTIME_%s=%v", strings.ToUpper(k), v))
}

dialOpts := append(ctx.DialOptions, rpcutil.OpenTracingInterceptorDialOptions(
otgrpc.SpanDecorator(decorateProviderSpans))...)

plug, err = newPlugin(ctx, ctx.Pwd, path, prefix,
[]string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans))
[]string{host.ServerAddr()}, env, dialOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,8 +170,13 @@ func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Ve
// NewProviderFromPath creates a new provider by loading the plugin binary located at `path`.
func NewProviderFromPath(host Host, ctx *Context, path string) (Provider, error) {
env := os.Environ()

dialOpts := append(ctx.DialOptions, rpcutil.OpenTracingInterceptorDialOptions(
otgrpc.SpanDecorator(decorateProviderSpans),
)...)

plug, err := newPlugin(ctx, ctx.Pwd, path, "",
[]string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans))
[]string{host.ServerAddr()}, env, dialOpts)
if err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions sdk/go/common/util/rpcutil/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ func OpenTracingStreamClientInterceptor(options ...otgrpc.Option) grpc.StreamCli
return otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer(), options...)
}

// Configures gRPC clients with OpenTracing interceptors.
func OpenTracingInterceptorDialOptions(opts ...otgrpc.Option) []grpc.DialOption {
return []grpc.DialOption{
grpc.WithChainUnaryInterceptor(OpenTracingClientInterceptor(opts...)),
grpc.WithChainStreamInterceptor(OpenTracingStreamClientInterceptor(opts...)),
}
}

// Wraps an opentracing.Tracer to reparent orphan traces with a given
// default parent span.
type reparentingTracer struct {
Expand Down

0 comments on commit 7a22a2d

Please sign in to comment.