Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warn users when there are pending operations but proceed with deployment #9293

Merged
merged 9 commits into from Apr 5, 2022
3 changes: 3 additions & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -3,6 +3,9 @@
- Clear pending operations during `pulumi refresh` or `pulumi up -r`.
[#8435](https://github.com/pulumi/pulumi/pull/8435)

- [cli] Warn users when there are pending operations but proceed with deployment
[#9293](https://github.com/pulumi/pulumi/pull/9293)

### Bug Fixes

- [codegen/go] - Fix Go SDK function output to check for errors
Expand Down
32 changes: 0 additions & 32 deletions pkg/cmd/pulumi/errors.go
Expand Up @@ -7,7 +7,6 @@ import (
"io"

"github.com/pulumi/pulumi/pkg/v3/engine"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
Expand All @@ -26,10 +25,6 @@ func PrintEngineResult(res result.Result) result.Result {
err := res.Error()

switch e := err.(type) {
case deploy.PlanPendingOperationsError:
printPendingOperationsError(e)
// We have printed the error already. Should just bail at this point.
return result.Bail()
case engine.DecryptError:
printDecryptError(e)
// We have printed the error already. Should just bail at this point.
Expand All @@ -40,33 +35,6 @@ func PrintEngineResult(res result.Result) result.Result {
}
}

func printPendingOperationsError(e deploy.PlanPendingOperationsError) {
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
fprintf(writer,
"the current deployment has %d resource(s) with pending operations:\n", len(e.Operations))

for _, op := range e.Operations {
fprintf(writer, " * %s, interrupted while %s\n", op.Resource.URN, op.Type)
}

fprintf(writer, `
These resources are in an unknown state because the Pulumi CLI was interrupted while
waiting for changes to these resources to complete. You should confirm whether or not the
operations listed completed successfully by checking the state of the appropriate provider.
For example, if you are using AWS, you can confirm using the AWS Console.

Once you have confirmed the status of the interrupted operations, you can repair your stack
using 'pulumi stack export' to export your stack to a file. For each operation that succeeded,
remove that operation from the "pending_operations" section of the file. Once this is complete,
use 'pulumi stack import' to import the repaired stack.

refusing to proceed`)
contract.IgnoreError(writer.Flush())

cmdutil.Diag().Errorf(diag.RawMessage("" /*urn*/, buf.String()))
}

func printDecryptError(e engine.DecryptError) {
var buf bytes.Buffer
writer := bufio.NewWriter(&buf)
Expand Down
179 changes: 169 additions & 10 deletions pkg/engine/lifeycletest/pulumi_test.go
Expand Up @@ -550,11 +550,6 @@ func TestPreviewWithPendingOperations(t *testing.T) {
// A preview should succeed despite the pending operations.
_, res := op.Run(project, target, options, true, nil, nil)
assert.Nil(t, res)

// But an update should fail.
_, res = op.Run(project, target, options, false, nil, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test we get the expected warning here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see this is now just for preview and we have tests elsewhere for updates.

assertIsErrorOrBailResult(t, res)
assert.EqualError(t, res.Error(), deploy.PlanPendingOperationsError{}.Error())
}

// Tests that a refresh works for a stack with pending operations.
Expand Down Expand Up @@ -605,11 +600,6 @@ func TestRefreshWithPendingOperations(t *testing.T) {
options := UpdateOptions{Host: deploytest.NewPluginHost(nil, nil, program, loaders...)}
project, target := p.GetProject(), p.GetTarget(t, old)

// Without refreshing, an update should fail.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

_, res := op.Run(project, target, options, false, nil, nil)
assertIsErrorOrBailResult(t, res)
assert.EqualError(t, res.Error(), deploy.PlanPendingOperationsError{}.Error())

// With a refresh, the update should succeed.
withRefresh := options
withRefresh.Refresh = true
Expand All @@ -626,6 +616,175 @@ func TestRefreshWithPendingOperations(t *testing.T) {
assert.Nil(t, res)
}

// Test to make sure that if we pulumi refresh
// while having pending CREATE operations,
// that these are preserved after the refresh.
func TestRefreshPreservesPendingCreateOperations(t *testing.T) {
t.Parallel()

p := &TestPlan{}

const resType = "pkgA:m:typA"
urnA := p.NewURN(resType, "resA", "")
urnB := p.NewURN(resType, "resB", "")

newResource := func(urn resource.URN, id resource.ID, delete bool, dependencies ...resource.URN) *resource.State {
return &resource.State{
Type: urn.Type(),
URN: urn,
Custom: true,
Delete: delete,
ID: id,
Inputs: resource.PropertyMap{},
Outputs: resource.PropertyMap{},
Dependencies: dependencies,
}
}

// Notice here, we have two pending operations: update and create
// After a refresh, only the pending CREATE operation should
// be in the updated snapshot
resA := newResource(urnA, "0", false)
resB := newResource(urnB, "0", false)
old := &deploy.Snapshot{
PendingOperations: []resource.Operation{
{
Resource: resA,
Type: resource.OperationTypeUpdating,
},
{
Resource: resB,
Type: resource.OperationTypeCreating,
},
},
Resources: []*resource.State{
resA,
},
}

loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{}, nil
}),
}

program := deploytest.NewLanguageRuntime(func(_ plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
_, _, _, err := monitor.RegisterResource("pkgA:m:typA", "resA", true)
assert.NoError(t, err)
return nil
})

op := TestOp(Update)
options := UpdateOptions{Host: deploytest.NewPluginHost(nil, nil, program, loaders...)}
project, target := p.GetProject(), p.GetTarget(t, old)

// With a refresh, the update should succeed.
withRefresh := options
withRefresh.Refresh = true
new, res := op.Run(project, target, withRefresh, false, nil, nil)
assert.Nil(t, res)
// Assert that pending CREATE operation was preserved
assert.Len(t, new.PendingOperations, 1)
assert.Equal(t, resource.OperationTypeCreating, new.PendingOperations[0].Type)
assert.Equal(t, urnB, new.PendingOperations[0].Resource.URN)
}

func findPendingOperationsByType(opType resource.OperationType, snapshot *deploy.Snapshot) []resource.Operation {
var operations []resource.Operation
for _, operation := range snapshot.PendingOperations {
if operation.Type == opType {
operations = append(operations, operation)
}
}
return operations
}

// Update succeeds but gives a warning when there are pending operations
func TestUpdateShowsWarningWithPendingOperations(t *testing.T) {
t.Parallel()

p := &TestPlan{}

const resType = "pkgA:m:typA"
urnA := p.NewURN(resType, "resA", "")
urnB := p.NewURN(resType, "resB", "")

newResource := func(urn resource.URN, id resource.ID, delete bool, dependencies ...resource.URN) *resource.State {
return &resource.State{
Type: urn.Type(),
URN: urn,
Custom: true,
Delete: delete,
ID: id,
Inputs: resource.PropertyMap{},
Outputs: resource.PropertyMap{},
Dependencies: dependencies,
}
}

old := &deploy.Snapshot{
PendingOperations: []resource.Operation{
{
Resource: newResource(urnA, "0", false),
Type: resource.OperationTypeUpdating,
},
{
Resource: newResource(urnB, "1", false),
Type: resource.OperationTypeCreating,
}},
Resources: []*resource.State{
newResource(urnA, "0", false),
},
}

loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{}, nil
}),
}

program := deploytest.NewLanguageRuntime(func(_ plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
_, _, _, err := monitor.RegisterResource("pkgA:m:typA", "resA", true)
assert.NoError(t, err)
return nil
})

op := TestOp(Update)
options := UpdateOptions{Host: deploytest.NewPluginHost(nil, nil, program, loaders...)}
project, target := p.GetProject(), p.GetTarget(t, old)

// The update should succeed but give a warning
initialPartOfMessage := "Attempting to deploy or update resources with 1 pending operations from previous deployment."
validate := func(
project workspace.Project, target deploy.Target,
entries JournalEntries, events []Event,
res result.Result) result.Result {
for i := range events {
if events[i].Type == "diag" {
payload := events[i].Payload().(engine.DiagEventPayload)

if payload.Severity == "warning" && strings.Contains(payload.Message, initialPartOfMessage) {
return nil
}
return result.Errorf("Unexpected warning diag message: %s", payload.Message)
}
}
return result.Error("Expected a diagnostic message, got none")
}

new, _ := op.Run(project, target, options, false, nil, validate)
assert.NotNil(t, new)

assert.Equal(t, resource.OperationTypeCreating, new.PendingOperations[0].Type)

// Assert that CREATE pending operations are retained
// TODO: should revisit whether non-CREATE pending operations should also be retained
assert.Equal(t, 1, len(new.PendingOperations))
createOperations := findPendingOperationsByType(resource.OperationTypeCreating, new)
assert.Equal(t, 1, len(createOperations))
assert.Equal(t, urnB, createOperations[0].Resource.URN)
}

// Tests that a failed partial update causes the engine to persist the resource's old inputs and new outputs.
func TestUpdatePartialFailure(t *testing.T) {
t.Parallel()
Expand Down
11 changes: 0 additions & 11 deletions pkg/resource/deploy/deployment.go
Expand Up @@ -98,17 +98,6 @@ type Events interface {
PolicyEvents
}

// PlanPendingOperationsError is an error returned from `NewPlan` if there exist pending operations in the
// snapshot that we are preparing to operate upon. The engine does not allow any operations to be pending
// when operating on a snapshot.
type PlanPendingOperationsError struct {
Operations []resource.Operation
}

func (p PlanPendingOperationsError) Error() string {
return "one or more operations are currently pending"
}

type goalMap struct {
m sync.Map
}
Expand Down
36 changes: 34 additions & 2 deletions pkg/resource/deploy/deployment_executor.go
Expand Up @@ -97,6 +97,35 @@ func (ex *deploymentExecutor) checkTargets(targets []resource.URN, op StepOp) re
return nil
}

func (ex *deploymentExecutor) printPendingOperationsWarning() {
pendingOperations := ""
for _, op := range ex.deployment.prev.PendingOperations {
pendingOperations = pendingOperations + fmt.Sprintf(" * %s, interrupted while %s\n", op.Resource.URN, op.Type)
}

resolutionMessage := `These resources are in an unknown state because the Pulumi CLI was interrupted while
waiting for changes to these resources to complete. You should confirm whether or not the
operations listed completed successfully by checking the state of the appropriate provider.
For example, if you are using AWS, you can confirm using the AWS Console.

Once you have confirmed the status of the interrupted operations, you can repair your stack
using 'pulumi refresh' which will refresh the state from the provider you are using and
clear the pending operations if there are any.

Note that 'pulumi refresh' will not clear pending CREATE operations since those could have resulted in resources
which are not tracked by pulumi. To repair the stack and remove pending CREATE operation,
use 'pulumi stack export' which will export your stack to a file. For each operation that succeeded,
remove that operation from the "pending_operations" section of the file. Once this is complete,
use 'pulumi stack import' to import the repaired stack.`

warning := "Attempting to deploy or update resources " +
fmt.Sprintf("with %d pending operations from previous deployment.\n", len(ex.deployment.prev.PendingOperations)) +
pendingOperations +
resolutionMessage

ex.deployment.Diag().Warningf(diag.RawMessage("" /*urn*/, warning))
}

// reportExecResult issues an appropriate diagnostic depending on went wrong.
func (ex *deploymentExecutor) reportExecResult(message string, preview bool) {
kind := "update"
Expand Down Expand Up @@ -146,8 +175,11 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
if opts.RefreshOnly {
return nil, nil
}
} else if ex.deployment.prev != nil && len(ex.deployment.prev.PendingOperations) != 0 && !preview {
return nil, result.FromError(PlanPendingOperationsError{ex.deployment.prev.PendingOperations})
} else if ex.deployment.prev != nil && len(ex.deployment.prev.PendingOperations) > 0 && !preview {
// Print a warning for users that there are pending operations.
// Explain that these operations can be cleared using pulumi refresh (except for CREATE operations)
// since these require user intevention:
ex.printPendingOperationsWarning()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to what we did with refresh I think we might want to do something to ensure that after doing the update our snapshot still tracks that there were pending operations that were not resovled.

}

// The set of -t targets provided on the command line. 'nil' means 'update everything'.
Expand Down