Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor.Invoke and Provider.Invoke take different arguments #9323

Merged
merged 14 commits into from Apr 14, 2022
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -15,6 +15,9 @@
- [cli] Display richer diffs for texutal property values.
[#9376](https://github.com/pulumi/pulumi/pull/9376)

- [cli] Split invoke request protobufs, as monitors and providers take different arguments.
[#9323](https://github.com/pulumi/pulumi/pull/9323)

### Bug Fixes

- [codegen/node] - Fix an issue with escaping deprecation messages.
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/deploy/deploytest/resourcemonitor.go
Expand Up @@ -284,7 +284,7 @@ func (rm *ResourceMonitor) Invoke(tok tokens.ModuleMember, inputs resource.Prope
}

// submit request
resp, err := rm.resmon.Invoke(context.Background(), &pulumirpc.InvokeRequest{
resp, err := rm.resmon.Invoke(context.Background(), &pulumirpc.ResourceInvokeRequest{
Tok: string(tok),
Provider: provider,
Args: ins,
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/deploy/source.go
Expand Up @@ -62,7 +62,7 @@ type SourceResourceMonitor interface {

Address() string
Cancel() error
Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error)
Invoke(ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error)
Call(ctx context.Context, req *pulumirpc.CallRequest) (*pulumirpc.CallResponse, error)
ReadResource(ctx context.Context,
req *pulumirpc.ReadResourceRequest) (*pulumirpc.ReadResourceResponse, error)
Expand Down
8 changes: 4 additions & 4 deletions pkg/resource/deploy/source_eval.go
Expand Up @@ -634,7 +634,7 @@ func (rm *resmon) SupportsFeature(ctx context.Context,
}

// Invoke performs an invocation of a member located in a resource provider.
func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {
// Fetch the token and load up the resource provider if necessary.
tok := tokens.ModuleMember(req.GetTok())
providerReq, err := parseProviderRequest(tok.Package(), req.GetVersion(), req.GetPluginDownloadURL())
Expand Down Expand Up @@ -668,7 +668,7 @@ func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pu
mret, err := plugin.MarshalProperties(ret, plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepResources: true,
KeepResources: req.GetAcceptResources(),
})
if err != nil {
return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err)
Expand Down Expand Up @@ -770,7 +770,7 @@ func (rm *resmon) Call(ctx context.Context, req *pulumirpc.CallRequest) (*pulumi
}

func (rm *resmon) StreamInvoke(
req *pulumirpc.InvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer) error {
req *pulumirpc.ResourceInvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer) error {

tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("ResourceMonitor.StreamInvoke(%s)", tok)
Expand Down Expand Up @@ -1320,7 +1320,7 @@ func decorateResourceSpans(span opentracing.Span, method string, req, resp inter

switch method {
case "/pulumirpc.ResourceMonitor/Invoke":
span.SetTag("pulumi-decorator", req.(*pulumirpc.InvokeRequest).Tok)
span.SetTag("pulumi-decorator", req.(*pulumirpc.ResourceInvokeRequest).Tok)
case "/pulumirpc.ResourceMonitor/ReadResource":
span.SetTag("pulumi-decorator", req.(*pulumirpc.ReadResourceRequest).Type)
case "/pulumirpc.ResourceMonitor/RegisterResource":
Expand Down
11 changes: 8 additions & 3 deletions pkg/resource/deploy/source_query.go
Expand Up @@ -321,7 +321,9 @@ func (rm *queryResmon) Cancel() error {
}

// Invoke performs an invocation of a member located in a resource provider.
func (rm *queryResmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
func (rm *queryResmon) Invoke(
ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {

tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("QueryResourceMonitor.Invoke(%s)", tok)

Expand Down Expand Up @@ -372,7 +374,7 @@ func (rm *queryResmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest)
}

func (rm *queryResmon) StreamInvoke(
req *pulumirpc.InvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer) error {
req *pulumirpc.ResourceInvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer) error {

tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("QueryResourceMonitor.StreamInvoke(%s)", tok)
Expand All @@ -396,7 +398,10 @@ func (rm *queryResmon) StreamInvoke(
// streaming operation completes!
logging.V(5).Infof("QueryResourceMonitor.StreamInvoke received: tok=%v #args=%v", tok, len(args))
failures, err := prov.StreamInvoke(tok, args, func(event resource.PropertyMap) error {
mret, err := plugin.MarshalProperties(event, plugin.MarshalOptions{Label: label, KeepUnknowns: true})
mret, err := plugin.MarshalProperties(event, plugin.MarshalOptions{Label: label,
KeepUnknowns: true,
KeepResources: req.GetAcceptResources(),
})
if err != nil {
return fmt.Errorf("failed to marshal return: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/deploy/source_query_test.go
Expand Up @@ -184,7 +184,7 @@ func (rm *mockQueryResmon) Cancel() error {
return nil
}
func (rm *mockQueryResmon) Invoke(ctx context.Context,
req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {

panic("not implemented")
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/dotnet/Pulumi/Deployment/Deployment_Invoke.cs
Expand Up @@ -105,7 +105,7 @@ private async Task<SerializationResult> InvokeRawAsync(string token, Serializati

var provider = await ProviderResource.RegisterAsync(GetProvider(token, options)).ConfigureAwait(false);

var result = await this.Monitor.InvokeAsync(new InvokeRequest
var result = await this.Monitor.InvokeAsync(new ResourceInvokeRequest
{
Tok = token,
Provider = provider ?? "",
Expand Down
12 changes: 6 additions & 6 deletions sdk/dotnet/Pulumi/Deployment/GrpcMonitor.cs
Expand Up @@ -29,7 +29,7 @@ public GrpcMonitor(string monitorAddress)
// A channel already exists for this address
this._client = new ResourceMonitor.ResourceMonitorClient(monitorChannel);
}
else
else
{
lock (_channelsLock)
{
Expand All @@ -38,7 +38,7 @@ public GrpcMonitor(string monitorAddress)
// A channel already exists for this address
this._client = new ResourceMonitor.ResourceMonitorClient(monitorChannel);
}
else
else
{
// Inititialize the monitor channel once for this monitor address
var channel = GrpcChannel.ForAddress(new Uri($"http://{monitorAddress}"), new GrpcChannelOptions
Expand All @@ -54,22 +54,22 @@ public GrpcMonitor(string monitorAddress)
}
}
}

public async Task<SupportsFeatureResponse> SupportsFeatureAsync(SupportsFeatureRequest request)
=> await this._client.SupportsFeatureAsync(request);

public async Task<InvokeResponse> InvokeAsync(InvokeRequest request)
public async Task<InvokeResponse> InvokeAsync(ResourceInvokeRequest request)
=> await this._client.InvokeAsync(request);

public async Task<CallResponse> CallAsync(CallRequest request)
=> await this._client.CallAsync(request);

public async Task<ReadResourceResponse> ReadResourceAsync(Resource resource, ReadResourceRequest request)
=> await this._client.ReadResourceAsync(request);

public async Task<RegisterResourceResponse> RegisterResourceAsync(Resource resource, RegisterResourceRequest request)
=> await this._client.RegisterResourceAsync(request);

public async Task RegisterResourceOutputsAsync(RegisterResourceOutputsRequest request)
=> await this._client.RegisterResourceOutputsAsync(request);
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/dotnet/Pulumi/Deployment/IMonitor.cs
Expand Up @@ -9,14 +9,14 @@ internal interface IMonitor
{
Task<SupportsFeatureResponse> SupportsFeatureAsync(SupportsFeatureRequest request);

Task<InvokeResponse> InvokeAsync(InvokeRequest request);
Task<InvokeResponse> InvokeAsync(ResourceInvokeRequest request);

Task<CallResponse> CallAsync(CallRequest request);

Task<ReadResourceResponse> ReadResourceAsync(Resource resource, ReadResourceRequest request);

Task<RegisterResourceResponse> RegisterResourceAsync(Resource resource, RegisterResourceRequest request);

Task RegisterResourceOutputsAsync(RegisterResourceOutputsRequest request);
}
}
4 changes: 2 additions & 2 deletions sdk/dotnet/Pulumi/Testing/MockMonitor.cs
Expand Up @@ -30,7 +30,7 @@ public Task<SupportsFeatureResponse> SupportsFeatureAsync(SupportsFeatureRequest
return Task.FromResult(new SupportsFeatureResponse { HasSupport = hasSupport });
}

public async Task<InvokeResponse> InvokeAsync(InvokeRequest request)
public async Task<InvokeResponse> InvokeAsync(ResourceInvokeRequest request)
{
var args = ToDictionary(request.Args);

Expand All @@ -47,7 +47,7 @@ public async Task<InvokeResponse> InvokeAsync(InvokeRequest request)
}
return new InvokeResponse { Return = await SerializeAsync(registeredResource).ConfigureAwait(false) };
}

var result = await _mocks.CallAsync(new MockCallArgs
{
Token = request.Tok,
Expand Down
10 changes: 4 additions & 6 deletions sdk/go/common/resource/plugin/provider_plugin.go
Expand Up @@ -1221,9 +1221,8 @@ func (p *provider) Invoke(tok tokens.ModuleMember, args resource.PropertyMap) (r
}

resp, err := client.Invoke(p.requestContext(), &pulumirpc.InvokeRequest{
Tok: string(tok),
Args: margs,
AcceptResources: p.acceptResources,
Tok: string(tok),
Args: margs,
})
if err != nil {
rpcError := rpcerror.Convert(err)
Expand Down Expand Up @@ -1286,9 +1285,8 @@ func (p *provider) StreamInvoke(

streamClient, err := client.StreamInvoke(
p.requestContext(), &pulumirpc.InvokeRequest{
Tok: string(tok),
Args: margs,
AcceptResources: p.acceptResources,
Tok: string(tok),
Args: margs,
})
if err != nil {
rpcError := rpcerror.Convert(err)
Expand Down
4 changes: 2 additions & 2 deletions sdk/go/pulumi/context.go
Expand Up @@ -282,7 +282,7 @@ func (ctx *Context) Invoke(tok string, args interface{}, result interface{}, opt

// Now, invoke the RPC to the provider synchronously.
logging.V(9).Infof("Invoke(%s, #args=%d): RPC call being made synchronously", tok, len(resolvedArgsMap))
resp, err := ctx.monitor.Invoke(ctx.ctx, &pulumirpc.InvokeRequest{
resp, err := ctx.monitor.Invoke(ctx.ctx, &pulumirpc.ResourceInvokeRequest{
Tok: tok,
Args: rpcArgs,
Provider: providerRef,
Expand Down Expand Up @@ -699,7 +699,7 @@ func (ctx *Context) getResource(urn string) (*pulumirpc.RegisterResourceResponse

tok := "pulumi:pulumi:getResource"
logging.V(9).Infof("Invoke(%s, #args=%d): RPC call being made synchronously", tok, len(resolvedArgsMap))
resp, err := ctx.monitor.Invoke(ctx.ctx, &pulumirpc.InvokeRequest{
resp, err := ctx.monitor.Invoke(ctx.ctx, &pulumirpc.ResourceInvokeRequest{
Tok: "pulumi:pulumi:getResource",
Args: rpcArgs,
})
Expand Down
6 changes: 3 additions & 3 deletions sdk/go/pulumi/mocks.go
Expand Up @@ -88,7 +88,7 @@ func (m *mockMonitor) SupportsFeature(ctx context.Context, in *pulumirpc.Support
}, nil
}

func (m *mockMonitor) Invoke(ctx context.Context, in *pulumirpc.InvokeRequest,
func (m *mockMonitor) Invoke(ctx context.Context, in *pulumirpc.ResourceInvokeRequest,
opts ...grpc.CallOption) (*pulumirpc.InvokeResponse, error) {

args, err := plugin.UnmarshalProperties(in.GetArgs(), plugin.MarshalOptions{
Expand Down Expand Up @@ -128,7 +128,7 @@ func (m *mockMonitor) Invoke(ctx context.Context, in *pulumirpc.InvokeRequest,

result, err := plugin.MarshalProperties(resultV, plugin.MarshalOptions{
KeepSecrets: true,
KeepResources: true,
KeepResources: in.GetAcceptResources(),
})
if err != nil {
return nil, err
Expand All @@ -139,7 +139,7 @@ func (m *mockMonitor) Invoke(ctx context.Context, in *pulumirpc.InvokeRequest,
}, nil
}

func (m *mockMonitor) StreamInvoke(ctx context.Context, in *pulumirpc.InvokeRequest,
func (m *mockMonitor) StreamInvoke(ctx context.Context, in *pulumirpc.ResourceInvokeRequest,
opts ...grpc.CallOption) (pulumirpc.ResourceMonitor_StreamInvokeClient, error) {

panic("not implemented")
Expand Down
4 changes: 2 additions & 2 deletions sdk/go/pulumi/resource_test.go
Expand Up @@ -549,13 +549,13 @@ func (i *interceptingResourceMonitor) SupportsFeature(ctx context.Context,
}

func (i *interceptingResourceMonitor) Invoke(ctx context.Context,
in *pulumirpc.InvokeRequest,
in *pulumirpc.ResourceInvokeRequest,
opts ...grpc.CallOption) (*pulumirpc.InvokeResponse, error) {
return i.inner.Invoke(ctx, in, opts...)
}

func (i *interceptingResourceMonitor) StreamInvoke(ctx context.Context,
in *pulumirpc.InvokeRequest,
in *pulumirpc.ResourceInvokeRequest,
opts ...grpc.CallOption) (pulumirpc.ResourceMonitor_StreamInvokeClient, error) {
return i.inner.StreamInvoke(ctx, in, opts...)
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/nodejs/cmd/pulumi-language-nodejs/proxy.go
Expand Up @@ -101,7 +101,7 @@ func servePipes(ctx context.Context, pipes pipes, target pulumirpc.ResourceMonit

// decode and dispatch the request
logging.V(10).Infof("Sync invoke: Unmarshalling request")
var req pulumirpc.InvokeRequest
var req pulumirpc.ResourceInvokeRequest
if err := pbcodec.Unmarshal(reqBytes, &req); err != nil {
logging.V(10).Infof("Sync invoke: Received error reading full from pipe: %s\n", err)
return err
Expand Down Expand Up @@ -187,12 +187,12 @@ type monitorProxy struct {
}

func (p *monitorProxy) Invoke(
ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {
return p.target.Invoke(ctx, req)
}

func (p *monitorProxy) StreamInvoke(
req *pulumirpc.InvokeRequest, server pulumirpc.ResourceMonitor_StreamInvokeServer) error {
req *pulumirpc.ResourceInvokeRequest, server pulumirpc.ResourceMonitor_StreamInvokeServer) error {

client, err := p.target.StreamInvoke(context.Background(), req)
if err != nil {
Expand Down