Skip to content

Commit

Permalink
[engine] Clear pending operations with refresh. (#8435)
Browse files Browse the repository at this point in the history
* [engine] Clear pending operations with refresh.

Just what it says on the tin. This is implemented by moving the check
for pending operations in the last statefile into the deployment
executor and making it conditional on whether or not a refresh is being
performed (either via `pulumi refresh` or `pulumi up -r`). Because
pending operations are not carried over from the base statefile, this
has the effect of clearing pending operations if a refresh is performed.

Fixes #4265.

* CL

* nil ref

* adjust a test

* Return nil for the plan when executing deployment

* fix lifecycle test

* parallel test for TestRefreshWithPendingOperations

* preserve pending create operations

Co-authored-by: Zaid Ajaj <zaid.naom@gmail.com>
  • Loading branch information
pgavlin and Zaid-Ajaj committed Mar 25, 2022
1 parent 1df38c3 commit 36c6533
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
@@ -1,5 +1,8 @@
### Improvements

- Clear pending operations during `pulumi refresh` or `pulumi up -r`.
[#8435](https://github.com/pulumi/pulumi/pull/8435)

### Bug Fixes

- [codegen/go] - Fix Go SDK function output to check for errors
Expand Down
9 changes: 9 additions & 0 deletions pkg/backend/snapshot.go
Expand Up @@ -600,6 +600,15 @@ func (sm *SnapshotManager) snap() *deploy.Snapshot {
}
}

// Track pending create operations from the base snapshot
// and propagate them to the new snapshot: we don't want to clear pending CREATE operations
// because these must require user intervention to be cleared or resolved.
for _, pendingOperation := range sm.baseSnapshot.PendingOperations {
if pendingOperation.Type == resource.OperationTypeCreating {
operations = append(operations, pendingOperation)
}
}

manifest := deploy.Manifest{
Time: time.Now(),
Version: version.Version,
Expand Down
11 changes: 11 additions & 0 deletions pkg/engine/journal.go
Expand Up @@ -111,6 +111,17 @@ func (entries JournalEntries) Snap(base *deploy.Snapshot) *deploy.Snapshot {
}
}

if base != nil {
// Track pending create operations from the base snapshot
// and propagate them to the new snapshot: we don't want to clear pending CREATE operations
// because these must require user intervention to be cleared or resolved.
for _, pendingOperation := range base.PendingOperations {
if pendingOperation.Type == resource.OperationTypeCreating {
operations = append(operations, pendingOperation)
}
}
}

// If we have a base snapshot, copy over its secrets manager.
var secretsManager secrets.Manager
if base != nil {
Expand Down
69 changes: 69 additions & 0 deletions pkg/engine/lifeycletest/pulumi_test.go
Expand Up @@ -557,6 +557,75 @@ func TestPreviewWithPendingOperations(t *testing.T) {
assert.EqualError(t, res.Error(), deploy.PlanPendingOperationsError{}.Error())
}

// Tests that a refresh works for a stack with pending operations.
func TestRefreshWithPendingOperations(t *testing.T) {
t.Parallel()

p := &TestPlan{}

const resType = "pkgA:m:typA"
urnA := p.NewURN(resType, "resA", "")

newResource := func(urn resource.URN, id resource.ID, delete bool, dependencies ...resource.URN) *resource.State {
return &resource.State{
Type: urn.Type(),
URN: urn,
Custom: true,
Delete: delete,
ID: id,
Inputs: resource.PropertyMap{},
Outputs: resource.PropertyMap{},
Dependencies: dependencies,
}
}

old := &deploy.Snapshot{
PendingOperations: []resource.Operation{{
Resource: newResource(urnA, "0", false),
Type: resource.OperationTypeUpdating,
}},
Resources: []*resource.State{
newResource(urnA, "0", false),
},
}

loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{}, nil
}),
}

program := deploytest.NewLanguageRuntime(func(_ plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
_, _, _, err := monitor.RegisterResource("pkgA:m:typA", "resA", true)
assert.NoError(t, err)
return nil
})

op := TestOp(Update)
options := UpdateOptions{Host: deploytest.NewPluginHost(nil, nil, program, loaders...)}
project, target := p.GetProject(), p.GetTarget(t, old)

// Without refreshing, an update should fail.
_, res := op.Run(project, target, options, false, nil, nil)
assertIsErrorOrBailResult(t, res)
assert.EqualError(t, res.Error(), deploy.PlanPendingOperationsError{}.Error())

// With a refresh, the update should succeed.
withRefresh := options
withRefresh.Refresh = true
new, res := op.Run(project, target, withRefresh, false, nil, nil)
assert.Nil(t, res)
assert.Len(t, new.PendingOperations, 0)

// Similarly, the update should succeed if performed after a separate refresh.
new, res = TestOp(Refresh).Run(project, target, options, false, nil, nil)
assert.Nil(t, res)
assert.Len(t, new.PendingOperations, 0)

_, res = op.Run(project, p.GetTarget(t, new), options, false, nil, nil)
assert.Nil(t, res)
}

// Tests that a failed partial update causes the engine to persist the resource's old inputs and new outputs.
func TestUpdatePartialFailure(t *testing.T) {
t.Parallel()
Expand Down
4 changes: 0 additions & 4 deletions pkg/resource/deploy/deployment.go
Expand Up @@ -305,10 +305,6 @@ func buildResourceMap(prev *Snapshot, preview bool) ([]*resource.State, map[reso
return nil, olds, nil
}

if prev.PendingOperations != nil && !preview {
return nil, nil, PlanPendingOperationsError{prev.PendingOperations}
}

for _, oldres := range prev.Resources {
// Ignore resources that are pending deletion; these should not be recorded in the LUT.
if oldres.Delete {
Expand Down
2 changes: 2 additions & 0 deletions pkg/resource/deploy/deployment_executor.go
Expand Up @@ -146,6 +146,8 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
if opts.RefreshOnly {
return nil, nil
}
} else if ex.deployment.prev != nil && len(ex.deployment.prev.PendingOperations) != 0 && !preview {
return nil, result.FromError(PlanPendingOperationsError{ex.deployment.prev.PendingOperations})
}

// The set of -t targets provided on the command line. 'nil' means 'update everything'.
Expand Down
13 changes: 1 addition & 12 deletions pkg/resource/deploy/deployment_test.go
Expand Up @@ -45,16 +45,5 @@ func TestPendingOperationsDeployment(t *testing.T) {
})

_, err := NewDeployment(&plugin.Context{}, &Target{}, snap, nil, &fixedSource{}, nil, false, nil)
if !assert.Error(t, err) {
t.FailNow()
}

invalidErr, ok := err.(PlanPendingOperationsError)
if !assert.True(t, ok) {
t.FailNow()
}

assert.Len(t, invalidErr.Operations, 1)
assert.Equal(t, resourceB.URN, invalidErr.Operations[0].Resource.URN)
assert.Equal(t, resource.OperationTypeCreating, invalidErr.Operations[0].Type)
assert.NoError(t, err)
}

0 comments on commit 36c6533

Please sign in to comment.