Skip to content

Commit

Permalink
Allow attaching to running providers (#8979)
Browse files Browse the repository at this point in the history
* Add Attach call

* Regenerate grpc

* Start plumbing in changes

* Main doens't need a port

* Split Attach into grpc interface

* Change envvar format

* Type test for attach

* lint

* Reformat python

* Implement provider debug for nodejs

* Fix plugin close

* lint

* Add to CHANGELOG

* Set Kill

Co-authored-by: Daniel Bradley <daniel@pulumi.com>
  • Loading branch information
Frassle and danielrbradley committed Apr 19, 2022
1 parent f629d53 commit c9ba17d
Show file tree
Hide file tree
Showing 21 changed files with 695 additions and 214 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -3,6 +3,9 @@
- [cli] Split invoke request protobufs, as monitors and providers take different arguments.
[#9323](https://github.com/pulumi/pulumi/pull/9323)

- [providers] - gRPC providers can now support an Attach method for debugging. The engine will attach to providers listed in the PULUMI_DEBUG_PROVIDERS environment variable. This should be of the form "providerName:port,otherProvider:port".
[#8979](https://github.com/pulumi/pulumi/pull/8979)

### Bug Fixes

- [cli/plugin] - Dynamic provider binaries will now be found even if pulumi/bin is not on $PATH.
Expand Down
11 changes: 11 additions & 0 deletions pkg/resource/provider/component_provider.go
Expand Up @@ -200,3 +200,14 @@ func (p *componentProvider) Call(ctx context.Context,
func (p *componentProvider) Cancel(context.Context, *pbempty.Empty) (*pbempty.Empty, error) {
return &pbempty.Empty{}, nil
}

// Attach attaches to the engine for an already running provider.
func (p *componentProvider) Attach(ctx context.Context,
req *pulumirpc.PluginAttach) (*pbempty.Empty, error) {
host, err := NewHostClient(req.GetAddress())
if err != nil {
return nil, err
}
p.host = host
return &pbempty.Empty{}, nil
}
13 changes: 9 additions & 4 deletions pkg/resource/provider/main.go
Expand Up @@ -43,13 +43,18 @@ func Main(name string, provMaker func(*HostClient) (pulumirpc.ResourceProviderSe

// Read the non-flags args and connect to the engine.
args := flag.Args()
var host *HostClient
if len(args) == 0 {
// Start the provider in Attach mode
} else if len(args) == 1 {
var err error
host, err = NewHostClient(args[0])
if err != nil {
return fmt.Errorf("fatal: could not connect to host RPC: %v", err)
}
} else {
return errors.New("fatal: could not connect to host RPC; missing argument")
}
host, err := NewHostClient(args[0])
if err != nil {
return fmt.Errorf("fatal: could not connect to host RPC: %v", err)
}

// Fire up a gRPC server, letting the kernel choose a free port for us.
port, done, err := rpcutil.Serve(0, nil, []func(*grpc.Server) error{
Expand Down
111 changes: 60 additions & 51 deletions sdk/go/common/resource/plugin/plugin.go
Expand Up @@ -115,6 +115,63 @@ var errRunPolicyModuleNotFound = errors.New("pulumi SDK does not support policy
// errPluginNotFound is returned when we try to execute a plugin but it is not found on disk.
var errPluginNotFound = errors.New("plugin not found")

func dialPlugin(port, bin, prefix string) (*grpc.ClientConn, error) {
// Now that we have the port, go ahead and create a gRPC client connection to it.
conn, err := grpc.Dial(
"127.0.0.1:"+port,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
rpcutil.GrpcChannelOptions(),
)
if err != nil {
return nil, errors.Wrapf(err, "could not dial plugin [%v] over RPC", bin)
}

// Now wait for the gRPC connection to the plugin to become ready.
// TODO[pulumi/pulumi#337]: in theory, this should be unnecessary. gRPC's default WaitForReady behavior
// should auto-retry appropriately. On Linux, however, we are observing different behavior. In the meantime
// while this bug exists, we'll simply do a bit of waiting of our own up front.
timeout, _ := context.WithTimeout(context.Background(), pluginRPCConnectionTimeout)
for {
s := conn.GetState()
if s == connectivity.Ready {
// The connection is supposedly ready; but we will make sure it is *actually* ready by sending a dummy
// method invocation to the server. Until it responds successfully, we can't safely proceed.
outer:
for {
err = grpc.Invoke(timeout, "", nil, nil, conn)
if err == nil {
break // successful connect
} else {
// We have an error; see if it's a known status and, if so, react appropriately.
status, ok := status.FromError(err)
if ok {
switch status.Code() {
case codes.Unavailable:
// The server is unavailable. This is the Linux bug. Wait a little and retry.
time.Sleep(time.Millisecond * 10)
continue // keep retrying
default:
// Since we sent "" as the method above, this is the expected response. Ready to go.
break outer
}
}

// Unexpected error; get outta dodge.
return nil, errors.Wrapf(err, "%v plugin [%v] did not come alive", prefix, bin)
}
}
break
}
// Not ready yet; ask the gRPC client APIs to block until the state transitions again so we can retry.
if !conn.WaitForStateChange(timeout, s) {
return nil, errors.Errorf("%v plugin [%v] did not begin responding to RPC connections", prefix, bin)
}
}

return conn, nil
}

func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, options ...otgrpc.Option) (*plugin, error) {
if logging.V(9) {
var argstr string
Expand Down Expand Up @@ -225,57 +282,9 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option
plug.stdoutDone = stdoutDone
go runtrace(plug.Stdout, false, stdoutDone)

// Now that we have the port, go ahead and create a gRPC client connection to it.
conn, err := grpc.Dial(
"127.0.0.1:"+port,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
rpcutil.GrpcChannelOptions(),
)
conn, err := dialPlugin(port, bin, prefix)
if err != nil {
return nil, errors.Wrapf(err, "could not dial plugin [%v] over RPC", bin)
}

// Now wait for the gRPC connection to the plugin to become ready.
// TODO[pulumi/pulumi#337]: in theory, this should be unnecessary. gRPC's default WaitForReady behavior
// should auto-retry appropriately. On Linux, however, we are observing different behavior. In the meantime
// while this bug exists, we'll simply do a bit of waiting of our own up front.
timeout, _ := context.WithTimeout(context.Background(), pluginRPCConnectionTimeout)
for {
s := conn.GetState()
if s == connectivity.Ready {
// The connection is supposedly ready; but we will make sure it is *actually* ready by sending a dummy
// method invocation to the server. Until it responds successfully, we can't safely proceed.
outer:
for {
err = grpc.Invoke(timeout, "", nil, nil, conn)
if err == nil {
break // successful connect
} else {
// We have an error; see if it's a known status and, if so, react appropriately.
status, ok := status.FromError(err)
if ok {
switch status.Code() {
case codes.Unavailable:
// The server is unavailable. This is the Linux bug. Wait a little and retry.
time.Sleep(time.Millisecond * 10)
continue // keep retrying
default:
// Since we sent "" as the method above, this is the expected response. Ready to go.
break outer
}
}

// Unexpected error; get outta dodge.
return nil, errors.Wrapf(err, "%v plugin [%v] did not come alive", prefix, bin)
}
}
break
}
// Not ready yet; ask the gRPC client APIs to block until the state transitions again so we can retry.
if !conn.WaitForStateChange(timeout, s) {
return nil, errors.Errorf("%v plugin [%v] did not begin responding to RPC connections", prefix, bin)
}
return nil, err
}

// Done; store the connection and return the plugin info.
Expand Down Expand Up @@ -382,7 +391,7 @@ func (p *plugin) Close() error {

result := p.Kill()

// Wait for stdout and stderr to drain.
// Wait for stdout and stderr to drain if we attached to the plugin we won't have a stdout/err
if p.stdoutDone != nil {
<-p.stdoutDone
}
Expand Down
9 changes: 9 additions & 0 deletions sdk/go/common/resource/plugin/provider.go
Expand Up @@ -103,6 +103,15 @@ type Provider interface {
SignalCancellation() error
}

type GrpcProvider interface {
Provider

// Attach triggers an attach for a currently running provider to the engine
// TODO It would be nice if this was a HostClient rather than the string address but due to dependency
// ordering we don't have access to declare that here.
Attach(address string) error
}

// CheckFailure indicates that a call to check failed; it contains the property and reason for the failure.
type CheckFailure struct {
Property resource.PropertyKey // the property that failed checking.
Expand Down
93 changes: 76 additions & 17 deletions sdk/go/common/resource/plugin/provider_plugin.go
Expand Up @@ -76,39 +76,81 @@ type provider struct {
// plugin could not be found, or an error occurs while creating the child process, an error is returned.
func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Version,
options map[string]interface{}, disableProviderPreview bool) (Provider, error) {
// Load the plugin's path by using the standard workspace logic.
_, path, err := workspace.GetPluginPath(
workspace.ResourcePlugin, strings.Replace(string(pkg), tokens.QNameDelimiter, "_", -1), version)
if err != nil {
return nil, err
}

contract.Assert(path != "")
// See if this is a provider we just want to attach to
var plug *plugin
var optAttach string
if providersEnvVar, has := os.LookupEnv("PULUMI_DEBUG_PROVIDERS"); has {
for _, provider := range strings.Split(providersEnvVar, ",") {
parts := strings.SplitN(provider, ":", 2)

// Runtime options are passed as environment variables to the provider.
env := os.Environ()
for k, v := range options {
env = append(env, fmt.Sprintf("PULUMI_RUNTIME_%s=%v", strings.ToUpper(k), v))
if parts[0] == pkg.String() {
optAttach = parts[1]
break
}
}
}

plug, err := newPlugin(ctx, ctx.Pwd, path, fmt.Sprintf("%v (resource)", pkg),
[]string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans))
if err != nil {
return nil, err
prefix := fmt.Sprintf("%v (resource)", pkg)

if optAttach != "" {
conn, err := dialPlugin(optAttach, pkg.String(), prefix)
if err != nil {
return nil, err
}

// Done; store the connection and return the plugin info.
plug = &plugin{
Conn: conn,
// Nothing to kill
Kill: func() error { return nil },
}
} else {
// Load the plugin's path by using the standard workspace logic.
_, path, err := workspace.GetPluginPath(
workspace.ResourcePlugin, strings.Replace(string(pkg), tokens.QNameDelimiter, "_", -1), version)
if err != nil {
return nil, err
}

contract.Assert(path != "")

// Runtime options are passed as environment variables to the provider.
env := os.Environ()
for k, v := range options {
env = append(env, fmt.Sprintf("PULUMI_RUNTIME_%s=%v", strings.ToUpper(k), v))
}

plug, err = newPlugin(ctx, ctx.Pwd, path, prefix,
[]string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans))
if err != nil {
return nil, err
}
}

contract.Assertf(plug != nil, "unexpected nil resource plugin for %s", pkg)

legacyPreview := cmdutil.IsTruthy(os.Getenv("PULUMI_LEGACY_PROVIDER_PREVIEW"))

return &provider{
p := &provider{
ctx: ctx,
pkg: pkg,
plug: plug,
clientRaw: pulumirpc.NewResourceProviderClient(plug.Conn),
cfgdone: make(chan bool),
disableProviderPreview: disableProviderPreview,
legacyPreview: legacyPreview,
}, nil
}

// If we just attached (i.e. plugin bin is nil) we need to call attach
if plug.Bin == "" {
err := p.Attach(host.ServerAddr())
if err != nil {
return nil, err
}
}

return p, nil
}

func NewProviderWithClient(ctx *Context, pkg tokens.Package, client pulumirpc.ResourceProviderClient,
Expand Down Expand Up @@ -1462,6 +1504,23 @@ func (p *provider) GetPluginInfo() (workspace.PluginInfo, error) {
}, nil
}

// Attach attaches this plugin to the engine
func (p *provider) Attach(address string) error {
label := fmt.Sprintf("%s.Attach()", p.label())
logging.V(7).Infof("%s executing", label)

// Calling Attach happens immediately after loading, and does not require configuration to proceed.
// Thus, we access the clientRaw property, rather than calling getClient.
_, err := p.clientRaw.Attach(p.requestContext(), &pulumirpc.PluginAttach{Address: address})
if err != nil {
rpcError := rpcerror.Convert(err)
logging.V(7).Infof("%s failed: err=%v", label, rpcError.Message())
return rpcError
}

return nil
}

func (p *provider) SignalCancellation() error {
_, err := p.clientRaw.Cancel(p.requestContext(), &pbempty.Empty{})
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions sdk/go/common/resource/plugin/provider_server.go
Expand Up @@ -143,6 +143,20 @@ func (p *providerServer) GetPluginInfo(ctx context.Context, req *pbempty.Empty)
return &pulumirpc.PluginInfo{Version: info.Version.String()}, nil
}

func (p *providerServer) Attach(ctx context.Context, req *pulumirpc.PluginAttach) (*pbempty.Empty, error) {
// NewProviderServer should take a GrpcProvider instead of Provider, but that's a breaking change
// so for now we type test here
if grpcProvider, ok := p.provider.(GrpcProvider); ok {
err := grpcProvider.Attach(req.GetAddress())
if err != nil {
return nil, err
}
return &pbempty.Empty{}, nil
}
// Else report this is unsupported
return nil, status.Error(codes.Unimplemented, "Attach is not yet implemented")
}

func (p *providerServer) Cancel(ctx context.Context, req *pbempty.Empty) (*pbempty.Empty, error) {
if err := p.provider.SignalCancellation(); err != nil {
return nil, err
Expand Down

0 comments on commit c9ba17d

Please sign in to comment.