Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow attaching to running providers #8979

Merged
merged 18 commits into from Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -12,5 +12,8 @@
- [cli] - Warn when `additionalSecretOutputs` is used to mark the `id` property as secret.
[#9360](https://github.com/pulumi/pulumi/pull/9360)

- [providers] - gRPC providers can now support an Attach method for debugging. The engine will attach to providers listed in the PULUMI_DEBUG_PROVIDERS environment variable. This should be of the form "providerName:port,otherProvider:port".
[#8979](https://github.com/pulumi/pulumi/pull/8979)

### Bug Fixes

11 changes: 11 additions & 0 deletions pkg/resource/provider/component_provider.go
Expand Up @@ -200,3 +200,14 @@ func (p *componentProvider) Call(ctx context.Context,
func (p *componentProvider) Cancel(context.Context, *pbempty.Empty) (*pbempty.Empty, error) {
return &pbempty.Empty{}, nil
}

// Attach attaches to the engine for an already running provider.
func (p *componentProvider) Attach(ctx context.Context,
req *pulumirpc.PluginAttach) (*pbempty.Empty, error) {
host, err := NewHostClient(req.GetAddress())
if err != nil {
return nil, err
}
p.host = host
return &pbempty.Empty{}, nil
}
13 changes: 9 additions & 4 deletions pkg/resource/provider/main.go
Expand Up @@ -43,13 +43,18 @@ func Main(name string, provMaker func(*HostClient) (pulumirpc.ResourceProviderSe

// Read the non-flags args and connect to the engine.
args := flag.Args()
var host *HostClient
if len(args) == 0 {
// Start the provider in Attach mode
} else if len(args) == 1 {
var err error
host, err = NewHostClient(args[0])
if err != nil {
return fmt.Errorf("fatal: could not connect to host RPC: %v", err)
}
} else {
return errors.New("fatal: could not connect to host RPC; missing argument")
}
host, err := NewHostClient(args[0])
if err != nil {
return fmt.Errorf("fatal: could not connect to host RPC: %v", err)
}

// Fire up a gRPC server, letting the kernel choose a free port for us.
port, done, err := rpcutil.Serve(0, nil, []func(*grpc.Server) error{
Expand Down
144 changes: 78 additions & 66 deletions sdk/go/common/resource/plugin/plugin.go
Expand Up @@ -114,6 +114,63 @@ var errRunPolicyModuleNotFound = errors.New("pulumi SDK does not support policy
// errPluginNotFound is returned when we try to execute a plugin but it is not found on disk.
var errPluginNotFound = errors.New("plugin not found")

func dialPlugin(port, bin, prefix string) (*grpc.ClientConn, error) {
// Now that we have the port, go ahead and create a gRPC client connection to it.
conn, err := grpc.Dial(
"127.0.0.1:"+port,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
rpcutil.GrpcChannelOptions(),
)
if err != nil {
return nil, errors.Wrapf(err, "could not dial plugin [%v] over RPC", bin)
}

// Now wait for the gRPC connection to the plugin to become ready.
// TODO[pulumi/pulumi#337]: in theory, this should be unnecessary. gRPC's default WaitForReady behavior
// should auto-retry appropriately. On Linux, however, we are observing different behavior. In the meantime
// while this bug exists, we'll simply do a bit of waiting of our own up front.
timeout, _ := context.WithTimeout(context.Background(), pluginRPCConnectionTimeout)
for {
s := conn.GetState()
if s == connectivity.Ready {
// The connection is supposedly ready; but we will make sure it is *actually* ready by sending a dummy
// method invocation to the server. Until it responds successfully, we can't safely proceed.
outer:
for {
err = grpc.Invoke(timeout, "", nil, nil, conn)
if err == nil {
break // successful connect
} else {
// We have an error; see if it's a known status and, if so, react appropriately.
status, ok := status.FromError(err)
if ok {
switch status.Code() {
case codes.Unavailable:
// The server is unavailable. This is the Linux bug. Wait a little and retry.
time.Sleep(time.Millisecond * 10)
continue // keep retrying
default:
// Since we sent "" as the method above, this is the expected response. Ready to go.
break outer
}
}

// Unexpected error; get outta dodge.
return nil, errors.Wrapf(err, "%v plugin [%v] did not come alive", prefix, bin)
}
}
break
}
// Not ready yet; ask the gRPC client APIs to block until the state transitions again so we can retry.
if !conn.WaitForStateChange(timeout, s) {
return nil, errors.Errorf("%v plugin [%v] did not begin responding to RPC connections", prefix, bin)
}
}

return conn, nil
}

func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, options ...otgrpc.Option) (*plugin, error) {
if logging.V(9) {
var argstr string
Expand Down Expand Up @@ -224,57 +281,9 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option
plug.stdoutDone = stdoutDone
go runtrace(plug.Stdout, false, stdoutDone)

// Now that we have the port, go ahead and create a gRPC client connection to it.
conn, err := grpc.Dial(
"127.0.0.1:"+port,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
rpcutil.GrpcChannelOptions(),
)
conn, err := dialPlugin(port, bin, prefix)
if err != nil {
return nil, errors.Wrapf(err, "could not dial plugin [%v] over RPC", bin)
}

// Now wait for the gRPC connection to the plugin to become ready.
// TODO[pulumi/pulumi#337]: in theory, this should be unnecessary. gRPC's default WaitForReady behavior
// should auto-retry appropriately. On Linux, however, we are observing different behavior. In the meantime
// while this bug exists, we'll simply do a bit of waiting of our own up front.
timeout, _ := context.WithTimeout(context.Background(), pluginRPCConnectionTimeout)
for {
s := conn.GetState()
if s == connectivity.Ready {
// The connection is supposedly ready; but we will make sure it is *actually* ready by sending a dummy
// method invocation to the server. Until it responds successfully, we can't safely proceed.
outer:
for {
err = grpc.Invoke(timeout, "", nil, nil, conn)
if err == nil {
break // successful connect
} else {
// We have an error; see if it's a known status and, if so, react appropriately.
status, ok := status.FromError(err)
if ok {
switch status.Code() {
case codes.Unavailable:
// The server is unavailable. This is the Linux bug. Wait a little and retry.
time.Sleep(time.Millisecond * 10)
continue // keep retrying
default:
// Since we sent "" as the method above, this is the expected response. Ready to go.
break outer
}
}

// Unexpected error; get outta dodge.
return nil, errors.Wrapf(err, "%v plugin [%v] did not come alive", prefix, bin)
}
}
break
}
// Not ready yet; ask the gRPC client APIs to block until the state transitions again so we can retry.
if !conn.WaitForStateChange(timeout, s) {
return nil, errors.Errorf("%v plugin [%v] did not begin responding to RPC connections", prefix, bin)
}
return nil, err
}

// Done; store the connection and return the plugin info.
Expand Down Expand Up @@ -363,24 +372,27 @@ func (p *plugin) Close() error {

var result error

// On each platform, plugins are not loaded directly, instead a shell launches each plugin as a child process, so
// instead we need to kill all the children of the PID we have recorded, as well. Otherwise we will block waiting
// for the child processes to close.
if err := cmdutil.KillChildren(p.Proc.Pid); err != nil {
result = multierror.Append(result, err)
}
// If we attached to the plugin we won't have a process or std/err
if p.Proc != nil {
// On each platform, plugins are not loaded directly, instead a shell launches each plugin as a child process, so
// instead we need to kill all the children of the PID we have recorded, as well. Otherwise we will block waiting
// for the child processes to close.
if err := cmdutil.KillChildren(p.Proc.Pid); err != nil {
result = multierror.Append(result, err)
}

// IDEA: consider a more graceful termination than just SIGKILL.
if err := p.Proc.Kill(); err != nil {
result = multierror.Append(result, err)
}
// IDEA: consider a more graceful termination than just SIGKILL.
if err := p.Proc.Kill(); err != nil {
result = multierror.Append(result, err)
}

// Wait for stdout and stderr to drain.
if p.stdoutDone != nil {
<-p.stdoutDone
}
if p.stderrDone != nil {
<-p.stderrDone
// Wait for stdout and stderr to drain.
if p.stdoutDone != nil {
<-p.stdoutDone
}
if p.stderrDone != nil {
<-p.stderrDone
}
}

return result
Expand Down
9 changes: 9 additions & 0 deletions sdk/go/common/resource/plugin/provider.go
Expand Up @@ -103,6 +103,15 @@ type Provider interface {
SignalCancellation() error
}

type GrpcProvider interface {
Provider

// Attach triggers an attach for a currently running provider to the engine
// TODO It would be nice if this was a HostClient rather than the string address but due to dependency
// ordering we don't have access to declare that here.
Attach(address string) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be an implementation detail of the gRPC implementation of plugins rather than part of the Plugin type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? I thought about maybe having a GrpcProvider that was just Provider + Attach? Is there a more "go" way to do it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm wondering whether or not we need to expose Attach at all. I think we could just invoke the gRPC method directly here: https://github.com/pulumi/pulumi/pull/8979/files#diff-4372cd77587542c3e09139fb236d0bce7d0ff31c3c81864950974b1e33e5104eR146

}

// CheckFailure indicates that a call to check failed; it contains the property and reason for the failure.
type CheckFailure struct {
Property resource.PropertyKey // the property that failed checking.
Expand Down
91 changes: 74 additions & 17 deletions sdk/go/common/resource/plugin/provider_plugin.go
Expand Up @@ -76,39 +76,79 @@ type provider struct {
// plugin could not be found, or an error occurs while creating the child process, an error is returned.
func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Version,
options map[string]interface{}, disableProviderPreview bool) (Provider, error) {
// Load the plugin's path by using the standard workspace logic.
_, path, err := workspace.GetPluginPath(
workspace.ResourcePlugin, strings.Replace(string(pkg), tokens.QNameDelimiter, "_", -1), version)
if err != nil {
return nil, err
}

contract.Assert(path != "")
// See if this is a provider we just want to attach to
var plug *plugin
var optAttach string
if providersEnvVar, has := os.LookupEnv("PULUMI_DEBUG_PROVIDERS"); has {
for _, provider := range strings.Split(providersEnvVar, ",") {
parts := strings.SplitN(provider, ":", 2)

// Runtime options are passed as environment variables to the provider.
env := os.Environ()
for k, v := range options {
env = append(env, fmt.Sprintf("PULUMI_RUNTIME_%s=%v", strings.ToUpper(k), v))
if parts[0] == pkg.String() {
optAttach = parts[1]
break
}
}
}

plug, err := newPlugin(ctx, ctx.Pwd, path, fmt.Sprintf("%v (resource)", pkg),
[]string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans))
if err != nil {
return nil, err
prefix := fmt.Sprintf("%v (resource)", pkg)

if optAttach != "" {
conn, err := dialPlugin(optAttach, pkg.String(), prefix)
if err != nil {
return nil, err
}

// Done; store the connection and return the plugin info.
plug = &plugin{
Conn: conn,
}
} else {
// Load the plugin's path by using the standard workspace logic.
_, path, err := workspace.GetPluginPath(
workspace.ResourcePlugin, strings.Replace(string(pkg), tokens.QNameDelimiter, "_", -1), version)
if err != nil {
return nil, err
}

contract.Assert(path != "")

// Runtime options are passed as environment variables to the provider.
env := os.Environ()
for k, v := range options {
env = append(env, fmt.Sprintf("PULUMI_RUNTIME_%s=%v", strings.ToUpper(k), v))
}

plug, err = newPlugin(ctx, ctx.Pwd, path, prefix,
[]string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans))
if err != nil {
return nil, err
}
}

contract.Assertf(plug != nil, "unexpected nil resource plugin for %s", pkg)

legacyPreview := cmdutil.IsTruthy(os.Getenv("PULUMI_LEGACY_PROVIDER_PREVIEW"))

return &provider{
p := &provider{
ctx: ctx,
pkg: pkg,
plug: plug,
clientRaw: pulumirpc.NewResourceProviderClient(plug.Conn),
cfgdone: make(chan bool),
disableProviderPreview: disableProviderPreview,
legacyPreview: legacyPreview,
}, nil
}

// If we just attached (i.e. plugin bin is nil) we need to call attach
if plug.Bin == "" {
err := p.Attach(host.ServerAddr())
if err != nil {
return nil, err
}
}

return p, nil
}

func NewProviderWithClient(ctx *Context, pkg tokens.Package, client pulumirpc.ResourceProviderClient,
Expand Down Expand Up @@ -1464,6 +1504,23 @@ func (p *provider) GetPluginInfo() (workspace.PluginInfo, error) {
}, nil
}

// Attach attaches this plugin to the engine
func (p *provider) Attach(address string) error {
label := fmt.Sprintf("%s.Attach()", p.label())
logging.V(7).Infof("%s executing", label)

// Calling Attach happens immediately after loading, and does not require configuration to proceed.
// Thus, we access the clientRaw property, rather than calling getClient.
_, err := p.clientRaw.Attach(p.requestContext(), &pulumirpc.PluginAttach{Address: address})
if err != nil {
rpcError := rpcerror.Convert(err)
logging.V(7).Infof("%s failed: err=%v", label, rpcError.Message())
return rpcError
}

return nil
}

func (p *provider) SignalCancellation() error {
_, err := p.clientRaw.Cancel(p.requestContext(), &pbempty.Empty{})
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions sdk/go/common/resource/plugin/provider_server.go
Expand Up @@ -143,6 +143,20 @@ func (p *providerServer) GetPluginInfo(ctx context.Context, req *pbempty.Empty)
return &pulumirpc.PluginInfo{Version: info.Version.String()}, nil
}

func (p *providerServer) Attach(ctx context.Context, req *pulumirpc.PluginAttach) (*pbempty.Empty, error) {
// NewProviderServer should take a GrpcProvider instead of Provider, but that's a breaking change
// so for now we type test here
if grpcProvider, ok := p.provider.(GrpcProvider); ok {
err := grpcProvider.Attach(req.GetAddress())
if err != nil {
return nil, err
}
return &pbempty.Empty{}, nil
}
// Else report this is unsupported
return nil, status.Error(codes.Unimplemented, "Attach is not yet implemented")
}

func (p *providerServer) Cancel(ctx context.Context, req *pbempty.Empty) (*pbempty.Empty, error) {
if err := p.provider.SignalCancellation(); err != nil {
return nil, err
Expand Down