Skip to content

Commit

Permalink
Monitor.Invoke and Provider.Invoke take different arguments (#9323)
Browse files Browse the repository at this point in the history
  • Loading branch information
Frassle committed Apr 14, 2022
1 parent da23a89 commit 32dcaa5
Show file tree
Hide file tree
Showing 29 changed files with 917 additions and 573 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
@@ -1,3 +1,6 @@
### Improvements

- [cli] Split invoke request protobufs, as monitors and providers take different arguments.
[#9323](https://github.com/pulumi/pulumi/pull/9323)

### Bug Fixes
2 changes: 1 addition & 1 deletion pkg/resource/deploy/deploytest/resourcemonitor.go
Expand Up @@ -284,7 +284,7 @@ func (rm *ResourceMonitor) Invoke(tok tokens.ModuleMember, inputs resource.Prope
}

// submit request
resp, err := rm.resmon.Invoke(context.Background(), &pulumirpc.InvokeRequest{
resp, err := rm.resmon.Invoke(context.Background(), &pulumirpc.ResourceInvokeRequest{
Tok: string(tok),
Provider: provider,
Args: ins,
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/deploy/source.go
Expand Up @@ -62,7 +62,7 @@ type SourceResourceMonitor interface {

Address() string
Cancel() error
Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error)
Invoke(ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error)
Call(ctx context.Context, req *pulumirpc.CallRequest) (*pulumirpc.CallResponse, error)
ReadResource(ctx context.Context,
req *pulumirpc.ReadResourceRequest) (*pulumirpc.ReadResourceResponse, error)
Expand Down
8 changes: 4 additions & 4 deletions pkg/resource/deploy/source_eval.go
Expand Up @@ -634,7 +634,7 @@ func (rm *resmon) SupportsFeature(ctx context.Context,
}

// Invoke performs an invocation of a member located in a resource provider.
func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {
// Fetch the token and load up the resource provider if necessary.
tok := tokens.ModuleMember(req.GetTok())
providerReq, err := parseProviderRequest(tok.Package(), req.GetVersion(), req.GetPluginDownloadURL())
Expand Down Expand Up @@ -668,7 +668,7 @@ func (rm *resmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pu
mret, err := plugin.MarshalProperties(ret, plugin.MarshalOptions{
Label: label,
KeepUnknowns: true,
KeepResources: true,
KeepResources: req.GetAcceptResources(),
})
if err != nil {
return nil, fmt.Errorf("failed to marshal %v return: %w", tok, err)
Expand Down Expand Up @@ -770,7 +770,7 @@ func (rm *resmon) Call(ctx context.Context, req *pulumirpc.CallRequest) (*pulumi
}

func (rm *resmon) StreamInvoke(
req *pulumirpc.InvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer) error {
req *pulumirpc.ResourceInvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer) error {

tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("ResourceMonitor.StreamInvoke(%s)", tok)
Expand Down Expand Up @@ -1320,7 +1320,7 @@ func decorateResourceSpans(span opentracing.Span, method string, req, resp inter

switch method {
case "/pulumirpc.ResourceMonitor/Invoke":
span.SetTag("pulumi-decorator", req.(*pulumirpc.InvokeRequest).Tok)
span.SetTag("pulumi-decorator", req.(*pulumirpc.ResourceInvokeRequest).Tok)
case "/pulumirpc.ResourceMonitor/ReadResource":
span.SetTag("pulumi-decorator", req.(*pulumirpc.ReadResourceRequest).Type)
case "/pulumirpc.ResourceMonitor/RegisterResource":
Expand Down
11 changes: 8 additions & 3 deletions pkg/resource/deploy/source_query.go
Expand Up @@ -321,7 +321,9 @@ func (rm *queryResmon) Cancel() error {
}

// Invoke performs an invocation of a member located in a resource provider.
func (rm *queryResmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
func (rm *queryResmon) Invoke(
ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {

tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("QueryResourceMonitor.Invoke(%s)", tok)

Expand Down Expand Up @@ -372,7 +374,7 @@ func (rm *queryResmon) Invoke(ctx context.Context, req *pulumirpc.InvokeRequest)
}

func (rm *queryResmon) StreamInvoke(
req *pulumirpc.InvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer) error {
req *pulumirpc.ResourceInvokeRequest, stream pulumirpc.ResourceMonitor_StreamInvokeServer) error {

tok := tokens.ModuleMember(req.GetTok())
label := fmt.Sprintf("QueryResourceMonitor.StreamInvoke(%s)", tok)
Expand All @@ -396,7 +398,10 @@ func (rm *queryResmon) StreamInvoke(
// streaming operation completes!
logging.V(5).Infof("QueryResourceMonitor.StreamInvoke received: tok=%v #args=%v", tok, len(args))
failures, err := prov.StreamInvoke(tok, args, func(event resource.PropertyMap) error {
mret, err := plugin.MarshalProperties(event, plugin.MarshalOptions{Label: label, KeepUnknowns: true})
mret, err := plugin.MarshalProperties(event, plugin.MarshalOptions{Label: label,
KeepUnknowns: true,
KeepResources: req.GetAcceptResources(),
})
if err != nil {
return fmt.Errorf("failed to marshal return: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/deploy/source_query_test.go
Expand Up @@ -184,7 +184,7 @@ func (rm *mockQueryResmon) Cancel() error {
return nil
}
func (rm *mockQueryResmon) Invoke(ctx context.Context,
req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {

panic("not implemented")
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/dotnet/Pulumi/Deployment/Deployment_Invoke.cs
Expand Up @@ -105,7 +105,7 @@ private async Task<SerializationResult> InvokeRawAsync(string token, Serializati

var provider = await ProviderResource.RegisterAsync(GetProvider(token, options)).ConfigureAwait(false);

var result = await this.Monitor.InvokeAsync(new InvokeRequest
var result = await this.Monitor.InvokeAsync(new ResourceInvokeRequest
{
Tok = token,
Provider = provider ?? "",
Expand Down
12 changes: 6 additions & 6 deletions sdk/dotnet/Pulumi/Deployment/GrpcMonitor.cs
Expand Up @@ -29,7 +29,7 @@ public GrpcMonitor(string monitorAddress)
// A channel already exists for this address
this._client = new ResourceMonitor.ResourceMonitorClient(monitorChannel);
}
else
else
{
lock (_channelsLock)
{
Expand All @@ -38,7 +38,7 @@ public GrpcMonitor(string monitorAddress)
// A channel already exists for this address
this._client = new ResourceMonitor.ResourceMonitorClient(monitorChannel);
}
else
else
{
// Inititialize the monitor channel once for this monitor address
var channel = GrpcChannel.ForAddress(new Uri($"http://{monitorAddress}"), new GrpcChannelOptions
Expand All @@ -54,22 +54,22 @@ public GrpcMonitor(string monitorAddress)
}
}
}

public async Task<SupportsFeatureResponse> SupportsFeatureAsync(SupportsFeatureRequest request)
=> await this._client.SupportsFeatureAsync(request);

public async Task<InvokeResponse> InvokeAsync(InvokeRequest request)
public async Task<InvokeResponse> InvokeAsync(ResourceInvokeRequest request)
=> await this._client.InvokeAsync(request);

public async Task<CallResponse> CallAsync(CallRequest request)
=> await this._client.CallAsync(request);

public async Task<ReadResourceResponse> ReadResourceAsync(Resource resource, ReadResourceRequest request)
=> await this._client.ReadResourceAsync(request);

public async Task<RegisterResourceResponse> RegisterResourceAsync(Resource resource, RegisterResourceRequest request)
=> await this._client.RegisterResourceAsync(request);

public async Task RegisterResourceOutputsAsync(RegisterResourceOutputsRequest request)
=> await this._client.RegisterResourceOutputsAsync(request);
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/dotnet/Pulumi/Deployment/IMonitor.cs
Expand Up @@ -9,14 +9,14 @@ internal interface IMonitor
{
Task<SupportsFeatureResponse> SupportsFeatureAsync(SupportsFeatureRequest request);

Task<InvokeResponse> InvokeAsync(InvokeRequest request);
Task<InvokeResponse> InvokeAsync(ResourceInvokeRequest request);

Task<CallResponse> CallAsync(CallRequest request);

Task<ReadResourceResponse> ReadResourceAsync(Resource resource, ReadResourceRequest request);

Task<RegisterResourceResponse> RegisterResourceAsync(Resource resource, RegisterResourceRequest request);

Task RegisterResourceOutputsAsync(RegisterResourceOutputsRequest request);
}
}
4 changes: 2 additions & 2 deletions sdk/dotnet/Pulumi/Testing/MockMonitor.cs
Expand Up @@ -30,7 +30,7 @@ public Task<SupportsFeatureResponse> SupportsFeatureAsync(SupportsFeatureRequest
return Task.FromResult(new SupportsFeatureResponse { HasSupport = hasSupport });
}

public async Task<InvokeResponse> InvokeAsync(InvokeRequest request)
public async Task<InvokeResponse> InvokeAsync(ResourceInvokeRequest request)
{
var args = ToDictionary(request.Args);

Expand All @@ -47,7 +47,7 @@ public async Task<InvokeResponse> InvokeAsync(InvokeRequest request)
}
return new InvokeResponse { Return = await SerializeAsync(registeredResource).ConfigureAwait(false) };
}

var result = await _mocks.CallAsync(new MockCallArgs
{
Token = request.Tok,
Expand Down
10 changes: 4 additions & 6 deletions sdk/go/common/resource/plugin/provider_plugin.go
Expand Up @@ -1221,9 +1221,8 @@ func (p *provider) Invoke(tok tokens.ModuleMember, args resource.PropertyMap) (r
}

resp, err := client.Invoke(p.requestContext(), &pulumirpc.InvokeRequest{
Tok: string(tok),
Args: margs,
AcceptResources: p.acceptResources,
Tok: string(tok),
Args: margs,
})
if err != nil {
rpcError := rpcerror.Convert(err)
Expand Down Expand Up @@ -1286,9 +1285,8 @@ func (p *provider) StreamInvoke(

streamClient, err := client.StreamInvoke(
p.requestContext(), &pulumirpc.InvokeRequest{
Tok: string(tok),
Args: margs,
AcceptResources: p.acceptResources,
Tok: string(tok),
Args: margs,
})
if err != nil {
rpcError := rpcerror.Convert(err)
Expand Down
4 changes: 2 additions & 2 deletions sdk/go/pulumi/context.go
Expand Up @@ -282,7 +282,7 @@ func (ctx *Context) Invoke(tok string, args interface{}, result interface{}, opt

// Now, invoke the RPC to the provider synchronously.
logging.V(9).Infof("Invoke(%s, #args=%d): RPC call being made synchronously", tok, len(resolvedArgsMap))
resp, err := ctx.monitor.Invoke(ctx.ctx, &pulumirpc.InvokeRequest{
resp, err := ctx.monitor.Invoke(ctx.ctx, &pulumirpc.ResourceInvokeRequest{
Tok: tok,
Args: rpcArgs,
Provider: providerRef,
Expand Down Expand Up @@ -699,7 +699,7 @@ func (ctx *Context) getResource(urn string) (*pulumirpc.RegisterResourceResponse

tok := "pulumi:pulumi:getResource"
logging.V(9).Infof("Invoke(%s, #args=%d): RPC call being made synchronously", tok, len(resolvedArgsMap))
resp, err := ctx.monitor.Invoke(ctx.ctx, &pulumirpc.InvokeRequest{
resp, err := ctx.monitor.Invoke(ctx.ctx, &pulumirpc.ResourceInvokeRequest{
Tok: "pulumi:pulumi:getResource",
Args: rpcArgs,
})
Expand Down
6 changes: 3 additions & 3 deletions sdk/go/pulumi/mocks.go
Expand Up @@ -88,7 +88,7 @@ func (m *mockMonitor) SupportsFeature(ctx context.Context, in *pulumirpc.Support
}, nil
}

func (m *mockMonitor) Invoke(ctx context.Context, in *pulumirpc.InvokeRequest,
func (m *mockMonitor) Invoke(ctx context.Context, in *pulumirpc.ResourceInvokeRequest,
opts ...grpc.CallOption) (*pulumirpc.InvokeResponse, error) {

args, err := plugin.UnmarshalProperties(in.GetArgs(), plugin.MarshalOptions{
Expand Down Expand Up @@ -128,7 +128,7 @@ func (m *mockMonitor) Invoke(ctx context.Context, in *pulumirpc.InvokeRequest,

result, err := plugin.MarshalProperties(resultV, plugin.MarshalOptions{
KeepSecrets: true,
KeepResources: true,
KeepResources: in.GetAcceptResources(),
})
if err != nil {
return nil, err
Expand All @@ -139,7 +139,7 @@ func (m *mockMonitor) Invoke(ctx context.Context, in *pulumirpc.InvokeRequest,
}, nil
}

func (m *mockMonitor) StreamInvoke(ctx context.Context, in *pulumirpc.InvokeRequest,
func (m *mockMonitor) StreamInvoke(ctx context.Context, in *pulumirpc.ResourceInvokeRequest,
opts ...grpc.CallOption) (pulumirpc.ResourceMonitor_StreamInvokeClient, error) {

panic("not implemented")
Expand Down
4 changes: 2 additions & 2 deletions sdk/go/pulumi/resource_test.go
Expand Up @@ -549,13 +549,13 @@ func (i *interceptingResourceMonitor) SupportsFeature(ctx context.Context,
}

func (i *interceptingResourceMonitor) Invoke(ctx context.Context,
in *pulumirpc.InvokeRequest,
in *pulumirpc.ResourceInvokeRequest,
opts ...grpc.CallOption) (*pulumirpc.InvokeResponse, error) {
return i.inner.Invoke(ctx, in, opts...)
}

func (i *interceptingResourceMonitor) StreamInvoke(ctx context.Context,
in *pulumirpc.InvokeRequest,
in *pulumirpc.ResourceInvokeRequest,
opts ...grpc.CallOption) (pulumirpc.ResourceMonitor_StreamInvokeClient, error) {
return i.inner.StreamInvoke(ctx, in, opts...)
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/nodejs/cmd/pulumi-language-nodejs/proxy.go
Expand Up @@ -101,7 +101,7 @@ func servePipes(ctx context.Context, pipes pipes, target pulumirpc.ResourceMonit

// decode and dispatch the request
logging.V(10).Infof("Sync invoke: Unmarshalling request")
var req pulumirpc.InvokeRequest
var req pulumirpc.ResourceInvokeRequest
if err := pbcodec.Unmarshal(reqBytes, &req); err != nil {
logging.V(10).Infof("Sync invoke: Received error reading full from pipe: %s\n", err)
return err
Expand Down Expand Up @@ -187,12 +187,12 @@ type monitorProxy struct {
}

func (p *monitorProxy) Invoke(
ctx context.Context, req *pulumirpc.InvokeRequest) (*pulumirpc.InvokeResponse, error) {
ctx context.Context, req *pulumirpc.ResourceInvokeRequest) (*pulumirpc.InvokeResponse, error) {
return p.target.Invoke(ctx, req)
}

func (p *monitorProxy) StreamInvoke(
req *pulumirpc.InvokeRequest, server pulumirpc.ResourceMonitor_StreamInvokeServer) error {
req *pulumirpc.ResourceInvokeRequest, server pulumirpc.ResourceMonitor_StreamInvokeServer) error {

client, err := p.target.StreamInvoke(context.Background(), req)
if err != nil {
Expand Down

0 comments on commit 32dcaa5

Please sign in to comment.