Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent concurrent writes to the component providers map #11151

Merged
merged 1 commit into from Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,4 @@
changes:
- type: fix
scope: engine
description: Prevent concurrent read/writes to the component providers map.
75 changes: 43 additions & 32 deletions pkg/resource/deploy/source_eval.go
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/blang/semver"
Expand Down Expand Up @@ -477,18 +478,19 @@ func (d *defaultProviders) getDefaultProviderRef(req providers.ProviderRequest)
// resmon implements the pulumirpc.ResourceMonitor interface and acts as the gateway between a language runtime's
// evaluation of a program and the internal resource planning and deployment logic.
type resmon struct {
diagostics diag.Sink // logger for user-facing messages
providers ProviderSource // the provider source itself.
componentResourceProviders map[resource.URN]map[string]string // which providers component resources used
defaultProviders *defaultProviders // the default provider manager.
constructInfo plugin.ConstructInfo // information for construct and call calls.
regChan chan *registerResourceEvent // the channel to send resource registrations to.
regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to.
regReadChan chan *readResourceEvent // the channel to send resource reads to.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
disableResourceReferences bool // true if resource references are disabled.
disableOutputValues bool // true if output values are disabled.
diagostics diag.Sink // logger for user-facing messages
providers ProviderSource // the provider source itself.
componentProviders map[resource.URN]map[string]string // which providers component resources used
componentProvidersLock sync.Mutex // which locks the componentProviders map
defaultProviders *defaultProviders // the default provider manager.
constructInfo plugin.ConstructInfo // information for construct and call calls.
regChan chan *registerResourceEvent // the channel to send resource registrations to.
regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to.
regReadChan chan *readResourceEvent // the channel to send resource reads to.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
disableResourceReferences bool // true if resource references are disabled.
disableOutputValues bool // true if output values are disabled.
}

var _ SourceResourceMonitor = (*resmon)(nil)
Expand All @@ -513,16 +515,16 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg

// New up an engine RPC server.
resmon := &resmon{
diagostics: src.plugctx.Diag,
providers: provs,
defaultProviders: d,
componentResourceProviders: map[resource.URN]map[string]string{},
regChan: regChan,
regOutChan: regOutChan,
regReadChan: regReadChan,
cancel: cancel,
disableResourceReferences: opts.DisableResourceReferences,
disableOutputValues: opts.DisableOutputValues,
diagostics: src.plugctx.Diag,
providers: provs,
defaultProviders: d,
componentProviders: map[resource.URN]map[string]string{},
regChan: regChan,
regOutChan: regOutChan,
regReadChan: regReadChan,
cancel: cancel,
disableResourceReferences: opts.DisableResourceReferences,
disableOutputValues: opts.DisableOutputValues,
}

// Fire up a gRPC server and start listening for incomings.
Expand Down Expand Up @@ -969,17 +971,22 @@ func (rm *resmon) RegisterResource(ctx context.Context,

// We handle updating the providers map to include the providers field of the parent if
// both the current resource and its parent is a component resource.
if parentsProviders, parentIsComponent := rm.componentResourceProviders[parent]; !custom &&
parent != "" && parentIsComponent {
for k, v := range parentsProviders {
if req.Providers == nil {
req.Providers = map[string]string{}
}
if _, ok := req.Providers[k]; !ok {
req.Providers[k] = v
func() {
// Function exists to scope the lock
rm.componentProvidersLock.Lock()
defer rm.componentProvidersLock.Unlock()
if parentsProviders, parentIsComponent := rm.componentProviders[parent]; !custom &&
parent != "" && parentIsComponent {
for k, v := range parentsProviders {
if req.Providers == nil {
req.Providers = map[string]string{}
}
if _, ok := req.Providers[k]; !ok {
req.Providers[k] = v
}
}
}
}
}()

label := fmt.Sprintf("ResourceMonitor.RegisterResource(%s,%s)", t, name)

Expand Down Expand Up @@ -1201,7 +1208,11 @@ func (rm *resmon) RegisterResource(ctx context.Context,
}

if !custom && result != nil && result.State != nil && result.State.URN != "" {
rm.componentResourceProviders[result.State.URN] = req.GetProviders()
func() {
rm.componentProvidersLock.Lock()
defer rm.componentProvidersLock.Unlock()
rm.componentProviders[result.State.URN] = req.GetProviders()
}()
}

// Filter out partially-known values if the requestor does not support them.
Expand Down