Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow attaching to running providers #8979

Merged
merged 18 commits into from Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/resource/deploy/builtins.go
Expand Up @@ -215,6 +215,11 @@ func (p *builtinProvider) GetPluginInfo() (workspace.PluginInfo, error) {
return workspace.PluginInfo{}, errors.New("the builtin provider does not report plugin info")
}

func (p *builtinProvider) Attach(_ string) error {
// return an error: this should not be called for the builtin provider
return errors.New("the builtin provider does not support attach")
}

func (p *builtinProvider) SignalCancellation() error {
p.cancel()
return nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/resource/deploy/providers/registry.go
Expand Up @@ -466,6 +466,11 @@ func (r *Registry) Call(tok tokens.ModuleMember, args resource.PropertyMap, info
return plugin.CallResult{}, errors.New("the provider registry is not callable")
}

func (r *Registry) Attach(_ string) error {
// return an error: this should not be called for the provider registry
return errors.New("the provider registry does not support attach")
}

func (r *Registry) GetPluginInfo() (workspace.PluginInfo, error) {
// return an error: this should not be called for the provider registry
return workspace.PluginInfo{}, errors.New("the provider registry does not report plugin info")
Expand Down
11 changes: 11 additions & 0 deletions pkg/resource/provider/component_provider.go
Expand Up @@ -200,3 +200,14 @@ func (p *componentProvider) Call(ctx context.Context,
func (p *componentProvider) Cancel(context.Context, *pbempty.Empty) (*pbempty.Empty, error) {
return &pbempty.Empty{}, nil
}

// Attach attaches to the engine for an already running provider.
func (p *componentProvider) Attach(ctx context.Context,
req *pulumirpc.PluginAttach) (*pbempty.Empty, error) {
host, err := NewHostClient(req.GetAddress())
if err != nil {
return nil, err
}
p.host = host
return &pbempty.Empty{}, nil
}
13 changes: 9 additions & 4 deletions pkg/resource/provider/main.go
Expand Up @@ -43,13 +43,18 @@ func Main(name string, provMaker func(*HostClient) (pulumirpc.ResourceProviderSe

// Read the non-flags args and connect to the engine.
args := flag.Args()
var host *HostClient
if len(args) == 0 {
// Start the provider in Attach mode
} else if len(args) == 1 {
var err error
host, err = NewHostClient(args[0])
if err != nil {
return fmt.Errorf("fatal: could not connect to host RPC: %v", err)
}
} else {
return errors.New("fatal: could not connect to host RPC; missing argument")
}
host, err := NewHostClient(args[0])
if err != nil {
return fmt.Errorf("fatal: could not connect to host RPC: %v", err)
}

// Fire up a gRPC server, letting the kernel choose a free port for us.
port, done, err := rpcutil.Serve(0, nil, []func(*grpc.Server) error{
Expand Down
4 changes: 4 additions & 0 deletions sdk/go/common/resource/plugin/provider.go
Expand Up @@ -101,6 +101,10 @@ type Provider interface {
// non-blocking; it is up to the host to decide how long to wait after SignalCancellation is
// called before (e.g.) hard-closing any gRPC connection.
SignalCancellation() error

// Attach triggers an attach for a currently running provider to the engine
// TODO It would be nice if this was a HostClient rather than the string address but due to dependency ordering we don't have access to declare that here.
Attach(address string) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be an implementation detail of the gRPC implementation of plugins rather than part of the Plugin type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? I thought about maybe having a GrpcProvider that was just Provider + Attach? Is there a more "go" way to do it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm wondering whether or not we need to expose Attach at all. I think we could just invoke the gRPC method directly here: https://github.com/pulumi/pulumi/pull/8979/files#diff-4372cd77587542c3e09139fb236d0bce7d0ff31c3c81864950974b1e33e5104eR146

}

// CheckFailure indicates that a call to check failed; it contains the property and reason for the failure.
Expand Down
141 changes: 118 additions & 23 deletions sdk/go/common/resource/plugin/provider_plugin.go
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"os"
"strings"
"time"

"github.com/blang/semver"
pbempty "github.com/golang/protobuf/ptypes/empty"
Expand All @@ -29,13 +30,17 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"

"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror"
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
Expand Down Expand Up @@ -76,43 +81,116 @@ type provider struct {
// plugin could not be found, or an error occurs while creating the child process, an error is returned.
func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Version,
options map[string]interface{}, disableProviderPreview bool) (Provider, error) {
// Load the plugin's path by using the standard workspace logic.
_, path, err := workspace.GetPluginPath(
workspace.ResourcePlugin, strings.Replace(string(pkg), tokens.QNameDelimiter, "_", -1), version)
if err != nil {
return nil, err
} else if path == "" {
return nil, workspace.NewMissingError(workspace.PluginInfo{
Kind: workspace.ResourcePlugin,
Name: string(pkg),
Version: version,
})
}

// Runtime options are passed as environment variables to the provider.
env := os.Environ()
for k, v := range options {
env = append(env, fmt.Sprintf("PULUMI_RUNTIME_%s=%v", strings.ToUpper(k), v))
}
prefix := fmt.Sprintf("%v (resource)", pkg)

// See if this is a provider we just want to attach to
var plug *plugin
optAttach, isFound := os.LookupEnv("PULUMI_PROVIDER_" + strings.ToUpper(pkg.String()))
if isFound {
conn, err := grpc.Dial(
"127.0.0.1:"+optAttach,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
rpcutil.GrpcChannelOptions(),
)
if err != nil {
return nil, errors.Wrapf(err, "could not dial plugin [%s:%s] over RPC", prefix, optAttach)
}

plug, err := newPlugin(ctx, ctx.Pwd, path, fmt.Sprintf("%v (resource)", pkg),
[]string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans))
if err != nil {
return nil, err
// Now wait for the gRPC connection to the plugin to become ready.
// TODO[pulumi/pulumi#337]: in theory, this should be unnecessary. gRPC's default WaitForReady behavior
// should auto-retry appropriately. On Linux, however, we are observing different behavior. In the meantime
// while this bug exists, we'll simply do a bit of waiting of our own up front.
timeout, _ := context.WithTimeout(context.Background(), pluginRPCConnectionTimeout)
for {
s := conn.GetState()
if s == connectivity.Ready {
// The connection is supposedly ready; but we will make sure it is *actually* ready by sending a dummy
// method invocation to the server. Until it responds successfully, we can't safely proceed.
outer:
for {
err = grpc.Invoke(timeout, "", nil, nil, conn)
if err == nil {
break // successful connect
} else {
// We have an error; see if it's a known status and, if so, react appropriately.
status, ok := status.FromError(err)
if ok {
switch status.Code() {
case codes.Unavailable:
// The server is unavailable. This is the Linux bug. Wait a little and retry.
time.Sleep(time.Millisecond * 10)
continue // keep retrying
default:
// Since we sent "" as the method above, this is the expected response. Ready to go.
break outer
}
}

// Unexpected error; get outta dodge.
return nil, errors.Wrapf(err, "%s:%s plugin did not come alive", prefix, optAttach)
}
}
break
}
// Not ready yet; ask the gRPC client APIs to block until the state transitions again so we can retry.
if !conn.WaitForStateChange(timeout, s) {
return nil, errors.Errorf("%s:%s plugin did not begin responding to RPC connections", prefix, optAttach)
}
}

// Done; store the connection and return the plugin info.
plug = &plugin{
Conn: conn,
}
} else {
// Load the plugin's path by using the standard workspace logic.
_, path, err := workspace.GetPluginPath(
workspace.ResourcePlugin, strings.Replace(string(pkg), tokens.QNameDelimiter, "_", -1), version)
if err != nil {
return nil, err
} else if path == "" {
return nil, workspace.NewMissingError(workspace.PluginInfo{
Kind: workspace.ResourcePlugin,
Name: string(pkg),
Version: version,
})
}

// Runtime options are passed as environment variables to the provider.
env := os.Environ()
for k, v := range options {
env = append(env, fmt.Sprintf("PULUMI_RUNTIME_%s=%v", strings.ToUpper(k), v))
}

plug, err = newPlugin(ctx, ctx.Pwd, path, prefix,
[]string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans))
if err != nil {
return nil, err
}
}

contract.Assertf(plug != nil, "unexpected nil resource plugin for %s", pkg)

legacyPreview := cmdutil.IsTruthy(os.Getenv("PULUMI_LEGACY_PROVIDER_PREVIEW"))

return &provider{
p := &provider{
ctx: ctx,
pkg: pkg,
plug: plug,
clientRaw: pulumirpc.NewResourceProviderClient(plug.Conn),
cfgdone: make(chan bool),
disableProviderPreview: disableProviderPreview,
legacyPreview: legacyPreview,
}, nil
}

// If we just attached (i.e. plugin bin is nil) we need to call attach
if plug.Bin == "" {
p.Attach(host.ServerAddr())
}

return p, nil
}

func NewProviderWithClient(ctx *Context, pkg tokens.Package, client pulumirpc.ResourceProviderClient,
Expand Down Expand Up @@ -1468,6 +1546,23 @@ func (p *provider) GetPluginInfo() (workspace.PluginInfo, error) {
}, nil
}

// Attach attaches this plugin to the engine
func (p *provider) Attach(address string) error {
label := fmt.Sprintf("%s.Attach()", p.label())
logging.V(7).Infof("%s executing", label)

// Calling Attach happens immediately after loading, and does not require configuration to proceed.
// Thus, we access the clientRaw property, rather than calling getClient.
_, err := p.clientRaw.Attach(p.requestContext(), &pulumirpc.PluginAttach{Address: address})
if err != nil {
rpcError := rpcerror.Convert(err)
logging.V(7).Infof("%s failed: err=%v", label, rpcError.Message())
return rpcError
}

return nil
}

func (p *provider) SignalCancellation() error {
_, err := p.clientRaw.Cancel(p.requestContext(), &pbempty.Empty{})
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions sdk/go/common/resource/plugin/provider_server.go
Expand Up @@ -143,6 +143,14 @@ func (p *providerServer) GetPluginInfo(ctx context.Context, req *pbempty.Empty)
return &pulumirpc.PluginInfo{Version: info.Version.String()}, nil
}

func (p *providerServer) Attach(ctx context.Context, req *pulumirpc.PluginAttach) (*pbempty.Empty, error) {
err := p.provider.Attach(req.GetAddress())
if err != nil {
return nil, err
}
return &pbempty.Empty{}, nil
}

func (p *providerServer) Cancel(ctx context.Context, req *pbempty.Empty) (*pbempty.Empty, error) {
if err := p.provider.SignalCancellation(); err != nil {
return nil, err
Expand Down