Skip to content

Commit

Permalink
Prevent concurrent writes to the component providers map
Browse files Browse the repository at this point in the history
  • Loading branch information
iwahbe committed Oct 25, 2022
1 parent 1180a27 commit da35443
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
changes:
- type: fix
scope: engine
description: Prevent concurrent read/writes to the component providers map.
75 changes: 43 additions & 32 deletions pkg/resource/deploy/source_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/blang/semver"
Expand Down Expand Up @@ -477,18 +478,19 @@ func (d *defaultProviders) getDefaultProviderRef(req providers.ProviderRequest)
// resmon implements the pulumirpc.ResourceMonitor interface and acts as the gateway between a language runtime's
// evaluation of a program and the internal resource planning and deployment logic.
type resmon struct {
diagostics diag.Sink // logger for user-facing messages
providers ProviderSource // the provider source itself.
componentResourceProviders map[resource.URN]map[string]string // which providers component resources used
defaultProviders *defaultProviders // the default provider manager.
constructInfo plugin.ConstructInfo // information for construct and call calls.
regChan chan *registerResourceEvent // the channel to send resource registrations to.
regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to.
regReadChan chan *readResourceEvent // the channel to send resource reads to.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
disableResourceReferences bool // true if resource references are disabled.
disableOutputValues bool // true if output values are disabled.
diagostics diag.Sink // logger for user-facing messages
providers ProviderSource // the provider source itself.
componentProviders map[resource.URN]map[string]string // which providers component resources used
componentProvidersLock sync.Mutex // which locks the componentProviders map
defaultProviders *defaultProviders // the default provider manager.
constructInfo plugin.ConstructInfo // information for construct and call calls.
regChan chan *registerResourceEvent // the channel to send resource registrations to.
regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to.
regReadChan chan *readResourceEvent // the channel to send resource reads to.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
disableResourceReferences bool // true if resource references are disabled.
disableOutputValues bool // true if output values are disabled.
}

var _ SourceResourceMonitor = (*resmon)(nil)
Expand All @@ -513,16 +515,16 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg

// New up an engine RPC server.
resmon := &resmon{
diagostics: src.plugctx.Diag,
providers: provs,
defaultProviders: d,
componentResourceProviders: map[resource.URN]map[string]string{},
regChan: regChan,
regOutChan: regOutChan,
regReadChan: regReadChan,
cancel: cancel,
disableResourceReferences: opts.DisableResourceReferences,
disableOutputValues: opts.DisableOutputValues,
diagostics: src.plugctx.Diag,
providers: provs,
defaultProviders: d,
componentProviders: map[resource.URN]map[string]string{},
regChan: regChan,
regOutChan: regOutChan,
regReadChan: regReadChan,
cancel: cancel,
disableResourceReferences: opts.DisableResourceReferences,
disableOutputValues: opts.DisableOutputValues,
}

// Fire up a gRPC server and start listening for incomings.
Expand Down Expand Up @@ -969,17 +971,22 @@ func (rm *resmon) RegisterResource(ctx context.Context,

// We handle updating the providers map to include the providers field of the parent if
// both the current resource and its parent is a component resource.
if parentsProviders, parentIsComponent := rm.componentResourceProviders[parent]; !custom &&
parent != "" && parentIsComponent {
for k, v := range parentsProviders {
if req.Providers == nil {
req.Providers = map[string]string{}
}
if _, ok := req.Providers[k]; !ok {
req.Providers[k] = v
func() {
// Function exists to scope the lock
rm.componentProvidersLock.Lock()
defer rm.componentProvidersLock.Unlock()
if parentsProviders, parentIsComponent := rm.componentProviders[parent]; !custom &&
parent != "" && parentIsComponent {
for k, v := range parentsProviders {
if req.Providers == nil {
req.Providers = map[string]string{}
}
if _, ok := req.Providers[k]; !ok {
req.Providers[k] = v
}
}
}
}
}()

label := fmt.Sprintf("ResourceMonitor.RegisterResource(%s,%s)", t, name)

Expand Down Expand Up @@ -1201,7 +1208,11 @@ func (rm *resmon) RegisterResource(ctx context.Context,
}

if !custom && result != nil && result.State != nil && result.State.URN != "" {
rm.componentResourceProviders[result.State.URN] = req.GetProviders()
func() {
rm.componentProvidersLock.Lock()
defer rm.componentProvidersLock.Unlock()
rm.componentProviders[result.State.URN] = req.GetProviders()
}()
}

// Filter out partially-known values if the requestor does not support them.
Expand Down

0 comments on commit da35443

Please sign in to comment.