Skip to content

Commit

Permalink
Warn users when there are pending operations but proceed with deploym…
Browse files Browse the repository at this point in the history
…ent (#9293)

* Warn users when there are pending operations but proceed with deployment

* Show pending operations in warnings and reword what users need to do

* make linter happy

* make linter happy again

* test that pending CREATE operations are preserved after a refresh

* test for update succeeds but gives a warning when there are pending operations

* make linter happy

* make linter happy

* assert that pending create ops are retained by updates
  • Loading branch information
Zaid-Ajaj committed Apr 5, 2022
1 parent 667fd08 commit b713e5a
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 55 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -3,5 +3,8 @@
- [cli] - Installing of language specific project dependencies is now managed by the language plugins, not the pulumi cli.
[#9294](https://github.com/pulumi/pulumi/pull/9294)

- [cli] Warn users when there are pending operations but proceed with deployment
[#9293](https://github.com/pulumi/pulumi/pull/9293)

### Bug Fixes

32 changes: 0 additions & 32 deletions pkg/cmd/pulumi/errors.go
Expand Up @@ -7,7 +7,6 @@ import (
"io"

"github.com/pulumi/pulumi/pkg/v3/engine"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
Expand All @@ -26,10 +25,6 @@ func PrintEngineResult(res result.Result) result.Result {
err := res.Error()

switch e := err.(type) {
case deploy.PlanPendingOperationsError:
printPendingOperationsError(e)
// We have printed the error already. Should just bail at this point.
return result.Bail()
case engine.DecryptError:
printDecryptError(e)
// We have printed the error already. Should just bail at this point.
Expand All @@ -40,33 +35,6 @@ func PrintEngineResult(res result.Result) result.Result {
}
}

func printPendingOperationsError(e deploy.PlanPendingOperationsError) {
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
fprintf(writer,
"the current deployment has %d resource(s) with pending operations:\n", len(e.Operations))

for _, op := range e.Operations {
fprintf(writer, " * %s, interrupted while %s\n", op.Resource.URN, op.Type)
}

fprintf(writer, `
These resources are in an unknown state because the Pulumi CLI was interrupted while
waiting for changes to these resources to complete. You should confirm whether or not the
operations listed completed successfully by checking the state of the appropriate provider.
For example, if you are using AWS, you can confirm using the AWS Console.
Once you have confirmed the status of the interrupted operations, you can repair your stack
using 'pulumi stack export' to export your stack to a file. For each operation that succeeded,
remove that operation from the "pending_operations" section of the file. Once this is complete,
use 'pulumi stack import' to import the repaired stack.
refusing to proceed`)
contract.IgnoreError(writer.Flush())

cmdutil.Diag().Errorf(diag.RawMessage("" /*urn*/, buf.String()))
}

func printDecryptError(e engine.DecryptError) {
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
Expand Down
179 changes: 169 additions & 10 deletions pkg/engine/lifeycletest/pulumi_test.go
Expand Up @@ -550,11 +550,6 @@ func TestPreviewWithPendingOperations(t *testing.T) {
// A preview should succeed despite the pending operations.
_, res := op.Run(project, target, options, true, nil, nil)
assert.Nil(t, res)

// But an update should fail.
_, res = op.Run(project, target, options, false, nil, nil)
assertIsErrorOrBailResult(t, res)
assert.EqualError(t, res.Error(), deploy.PlanPendingOperationsError{}.Error())
}

// Tests that a refresh works for a stack with pending operations.
Expand Down Expand Up @@ -605,11 +600,6 @@ func TestRefreshWithPendingOperations(t *testing.T) {
options := UpdateOptions{Host: deploytest.NewPluginHost(nil, nil, program, loaders...)}
project, target := p.GetProject(), p.GetTarget(t, old)

// Without refreshing, an update should fail.
_, res := op.Run(project, target, options, false, nil, nil)
assertIsErrorOrBailResult(t, res)
assert.EqualError(t, res.Error(), deploy.PlanPendingOperationsError{}.Error())

// With a refresh, the update should succeed.
withRefresh := options
withRefresh.Refresh = true
Expand All @@ -626,6 +616,175 @@ func TestRefreshWithPendingOperations(t *testing.T) {
assert.Nil(t, res)
}

// Test to make sure that if we pulumi refresh
// while having pending CREATE operations,
// that these are preserved after the refresh.
func TestRefreshPreservesPendingCreateOperations(t *testing.T) {
t.Parallel()

p := &TestPlan{}

const resType = "pkgA:m:typA"
urnA := p.NewURN(resType, "resA", "")
urnB := p.NewURN(resType, "resB", "")

newResource := func(urn resource.URN, id resource.ID, delete bool, dependencies ...resource.URN) *resource.State {
return &resource.State{
Type: urn.Type(),
URN: urn,
Custom: true,
Delete: delete,
ID: id,
Inputs: resource.PropertyMap{},
Outputs: resource.PropertyMap{},
Dependencies: dependencies,
}
}

// Notice here, we have two pending operations: update and create
// After a refresh, only the pending CREATE operation should
// be in the updated snapshot
resA := newResource(urnA, "0", false)
resB := newResource(urnB, "0", false)
old := &deploy.Snapshot{
PendingOperations: []resource.Operation{
{
Resource: resA,
Type: resource.OperationTypeUpdating,
},
{
Resource: resB,
Type: resource.OperationTypeCreating,
},
},
Resources: []*resource.State{
resA,
},
}

loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{}, nil
}),
}

program := deploytest.NewLanguageRuntime(func(_ plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
_, _, _, err := monitor.RegisterResource("pkgA:m:typA", "resA", true)
assert.NoError(t, err)
return nil
})

op := TestOp(Update)
options := UpdateOptions{Host: deploytest.NewPluginHost(nil, nil, program, loaders...)}
project, target := p.GetProject(), p.GetTarget(t, old)

// With a refresh, the update should succeed.
withRefresh := options
withRefresh.Refresh = true
new, res := op.Run(project, target, withRefresh, false, nil, nil)
assert.Nil(t, res)
// Assert that pending CREATE operation was preserved
assert.Len(t, new.PendingOperations, 1)
assert.Equal(t, resource.OperationTypeCreating, new.PendingOperations[0].Type)
assert.Equal(t, urnB, new.PendingOperations[0].Resource.URN)
}

func findPendingOperationsByType(opType resource.OperationType, snapshot *deploy.Snapshot) []resource.Operation {
var operations []resource.Operation
for _, operation := range snapshot.PendingOperations {
if operation.Type == opType {
operations = append(operations, operation)
}
}
return operations
}

// Update succeeds but gives a warning when there are pending operations
func TestUpdateShowsWarningWithPendingOperations(t *testing.T) {
t.Parallel()

p := &TestPlan{}

const resType = "pkgA:m:typA"
urnA := p.NewURN(resType, "resA", "")
urnB := p.NewURN(resType, "resB", "")

newResource := func(urn resource.URN, id resource.ID, delete bool, dependencies ...resource.URN) *resource.State {
return &resource.State{
Type: urn.Type(),
URN: urn,
Custom: true,
Delete: delete,
ID: id,
Inputs: resource.PropertyMap{},
Outputs: resource.PropertyMap{},
Dependencies: dependencies,
}
}

old := &deploy.Snapshot{
PendingOperations: []resource.Operation{
{
Resource: newResource(urnA, "0", false),
Type: resource.OperationTypeUpdating,
},
{
Resource: newResource(urnB, "1", false),
Type: resource.OperationTypeCreating,
}},
Resources: []*resource.State{
newResource(urnA, "0", false),
},
}

loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{}, nil
}),
}

program := deploytest.NewLanguageRuntime(func(_ plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
_, _, _, err := monitor.RegisterResource("pkgA:m:typA", "resA", true)
assert.NoError(t, err)
return nil
})

op := TestOp(Update)
options := UpdateOptions{Host: deploytest.NewPluginHost(nil, nil, program, loaders...)}
project, target := p.GetProject(), p.GetTarget(t, old)

// The update should succeed but give a warning
initialPartOfMessage := "Attempting to deploy or update resources with 1 pending operations from previous deployment."
validate := func(
project workspace.Project, target deploy.Target,
entries JournalEntries, events []Event,
res result.Result) result.Result {
for i := range events {
if events[i].Type == "diag" {
payload := events[i].Payload().(engine.DiagEventPayload)

if payload.Severity == "warning" && strings.Contains(payload.Message, initialPartOfMessage) {
return nil
}
return result.Errorf("Unexpected warning diag message: %s", payload.Message)
}
}
return result.Error("Expected a diagnostic message, got none")
}

new, _ := op.Run(project, target, options, false, nil, validate)
assert.NotNil(t, new)

assert.Equal(t, resource.OperationTypeCreating, new.PendingOperations[0].Type)

// Assert that CREATE pending operations are retained
// TODO: should revisit whether non-CREATE pending operations should also be retained
assert.Equal(t, 1, len(new.PendingOperations))
createOperations := findPendingOperationsByType(resource.OperationTypeCreating, new)
assert.Equal(t, 1, len(createOperations))
assert.Equal(t, urnB, createOperations[0].Resource.URN)
}

// Tests that a failed partial update causes the engine to persist the resource's old inputs and new outputs.
func TestUpdatePartialFailure(t *testing.T) {
t.Parallel()
Expand Down
11 changes: 0 additions & 11 deletions pkg/resource/deploy/deployment.go
Expand Up @@ -98,17 +98,6 @@ type Events interface {
PolicyEvents
}

// PlanPendingOperationsError is an error returned from `NewPlan` if there exist pending operations in the
// snapshot that we are preparing to operate upon. The engine does not allow any operations to be pending
// when operating on a snapshot.
type PlanPendingOperationsError struct {
Operations []resource.Operation
}

func (p PlanPendingOperationsError) Error() string {
return "one or more operations are currently pending"
}

type goalMap struct {
m sync.Map
}
Expand Down
36 changes: 34 additions & 2 deletions pkg/resource/deploy/deployment_executor.go
Expand Up @@ -97,6 +97,35 @@ func (ex *deploymentExecutor) checkTargets(targets []resource.URN, op StepOp) re
return nil
}

func (ex *deploymentExecutor) printPendingOperationsWarning() {
pendingOperations := ""
for _, op := range ex.deployment.prev.PendingOperations {
pendingOperations = pendingOperations + fmt.Sprintf(" * %s, interrupted while %s\n", op.Resource.URN, op.Type)
}

resolutionMessage := `These resources are in an unknown state because the Pulumi CLI was interrupted while
waiting for changes to these resources to complete. You should confirm whether or not the
operations listed completed successfully by checking the state of the appropriate provider.
For example, if you are using AWS, you can confirm using the AWS Console.
Once you have confirmed the status of the interrupted operations, you can repair your stack
using 'pulumi refresh' which will refresh the state from the provider you are using and
clear the pending operations if there are any.
Note that 'pulumi refresh' will not clear pending CREATE operations since those could have resulted in resources
which are not tracked by pulumi. To repair the stack and remove pending CREATE operation,
use 'pulumi stack export' which will export your stack to a file. For each operation that succeeded,
remove that operation from the "pending_operations" section of the file. Once this is complete,
use 'pulumi stack import' to import the repaired stack.`

warning := "Attempting to deploy or update resources " +
fmt.Sprintf("with %d pending operations from previous deployment.\n", len(ex.deployment.prev.PendingOperations)) +
pendingOperations +
resolutionMessage

ex.deployment.Diag().Warningf(diag.RawMessage("" /*urn*/, warning))
}

// reportExecResult issues an appropriate diagnostic depending on went wrong.
func (ex *deploymentExecutor) reportExecResult(message string, preview bool) {
kind := "update"
Expand Down Expand Up @@ -146,8 +175,11 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
if opts.RefreshOnly {
return nil, nil
}
} else if ex.deployment.prev != nil && len(ex.deployment.prev.PendingOperations) != 0 && !preview {
return nil, result.FromError(PlanPendingOperationsError{ex.deployment.prev.PendingOperations})
} else if ex.deployment.prev != nil && len(ex.deployment.prev.PendingOperations) > 0 && !preview {
// Print a warning for users that there are pending operations.
// Explain that these operations can be cleared using pulumi refresh (except for CREATE operations)
// since these require user intevention:
ex.printPendingOperationsWarning()
}

// The set of -t targets provided on the command line. 'nil' means 'update everything'.
Expand Down

0 comments on commit b713e5a

Please sign in to comment.