Skip to content

Commit

Permalink
Try #11027:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Oct 14, 2022
2 parents 39e2ed5 + 9994978 commit 36cac60
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 70 deletions.
4 changes: 4 additions & 0 deletions changelog/pending/20221014--engine--pending-deletes.yaml
@@ -0,0 +1,4 @@
changes:
- type: fix
scope: engine
description: Pending deletes are no longer executed before everything else. This correctly handles dependencies for resource graphs that were partially deleted.
160 changes: 160 additions & 0 deletions pkg/engine/lifecycletest/pulumi_test.go
Expand Up @@ -3956,3 +3956,163 @@ func TestDefaultParents(t *testing.T) {
assert.Equal(t, snap.Resources[0].URN, snap.Resources[1].Parent)
assert.Equal(t, snap.Resources[0].URN, snap.Resources[2].Parent)
}

func TestPendingDeleteOrder(t *testing.T) {
// Test for https://github.com/pulumi/pulumi/issues/2948 Ensure that if we have resources A and B, and we
// go to replace A but then fail to replace B that we correctly handle everything in the same order when
// we retry the update.
//
// That is normally for this operation we would do the following:
// 1. Create new A
// 2. Create new B
// 3. Delete old B
// 4. Delete old A
// So if step 2 fails to create the new B we want to see:
// 1. Create new A
// 2. Create new B (fail)
// 1. Create new B
// 2. Delete old B
// 3. Delete old A
// Currently (and what #2948 tracks) is that the engine does the following:
// 1. Create new A
// 2. Create new B (fail)
// 3. Delete old A
// 1. Create new B
// 2. Delete old B
// That delete A fails because the delete B needs to happen first.

t.Parallel()

cloudState := map[resource.ID]resource.PropertyMap{}

failCreationOfTypB := false

loaders := []*deploytest.ProviderLoader{
deploytest.NewProviderLoader("pkgA", semver.MustParse("1.0.0"), func() (plugin.Provider, error) {
return &deploytest.Provider{
CreateF: func(urn resource.URN, news resource.PropertyMap, timeout float64,
preview bool) (resource.ID, resource.PropertyMap, resource.Status, error) {

if strings.Contains(string(urn), "typB") && failCreationOfTypB {
return "", nil, resource.StatusOK, fmt.Errorf("Could not create typB")
}

id := resource.ID(fmt.Sprintf("%d", len(cloudState)))
if !preview {
cloudState[id] = news
}
return id, news, resource.StatusOK, nil
},
DeleteF: func(urn resource.URN,
id resource.ID, olds resource.PropertyMap, timeout float64) (resource.Status, error) {
// Fail if anything in cloud state still points to us
for _, res := range cloudState {
for _, v := range res {
if v.IsString() && v.StringValue() == string(id) {
return resource.StatusOK, fmt.Errorf("Can not delete %s", id)
}
}
}

delete(cloudState, id)
return resource.StatusOK, nil
},
DiffF: func(urn resource.URN,
id resource.ID, olds, news resource.PropertyMap, ignoreChanges []string) (plugin.DiffResult, error) {
if strings.Contains(string(urn), "typA") {
if !olds["foo"].DeepEquals(news["foo"]) {
return plugin.DiffResult{
Changes: plugin.DiffSome,
ReplaceKeys: []resource.PropertyKey{"foo"},
DetailedDiff: map[string]plugin.PropertyDiff{
"foo": {
Kind: plugin.DiffUpdateReplace,
InputDiff: true,
},
},
DeleteBeforeReplace: false,
}, nil
}
}
if strings.Contains(string(urn), "typB") {
if !olds["parent"].DeepEquals(news["parent"]) {
return plugin.DiffResult{
Changes: plugin.DiffSome,
ReplaceKeys: []resource.PropertyKey{"parent"},
DetailedDiff: map[string]plugin.PropertyDiff{
"parent": {
Kind: plugin.DiffUpdateReplace,
InputDiff: true,
},
},
DeleteBeforeReplace: false,
}, nil
}
}

return plugin.DiffResult{}, nil
},
UpdateF: func(urn resource.URN,
id resource.ID, olds, news resource.PropertyMap, timeout float64,
ignoreChanges []string, preview bool) (resource.PropertyMap, resource.Status, error) {
assert.Fail(t, "Didn't expect update to be called")
return nil, resource.StatusOK, nil
},
}, nil
}, deploytest.WithoutGrpc),
}

ins := resource.NewPropertyMapFromMap(map[string]interface{}{
"foo": "bar",
})
program := deploytest.NewLanguageRuntime(func(info plugin.RunInfo, monitor *deploytest.ResourceMonitor) error {
_, idA, _, err := monitor.RegisterResource("pkgA:m:typA", "resA", true, deploytest.ResourceOptions{
Inputs: ins,
})
assert.NoError(t, err)

_, _, _, err = monitor.RegisterResource("pkgA:m:typB", "resB", true, deploytest.ResourceOptions{
Inputs: resource.NewPropertyMapFromMap(map[string]interface{}{
"parent": idA,
}),
})
assert.NoError(t, err)

return nil
})
host := deploytest.NewPluginHost(nil, nil, program, loaders...)

p := &TestPlan{
Options: UpdateOptions{Host: host},
}

project := p.GetProject()

// Run an update to create the resources
snap, res := TestOp(Update).Run(project, p.GetTarget(t, nil), p.Options, false, p.BackendClient, nil)
assert.Nil(t, res)
assert.NotNil(t, snap)
assert.Len(t, snap.Resources, 3)

// Trigger a replacement of A but fail to create B
failCreationOfTypB = true
ins = resource.NewPropertyMapFromMap(map[string]interface{}{
"foo": "baz",
})
snap, res = TestOp(Update).Run(project, p.GetTarget(t, snap), p.Options, false, p.BackendClient, nil)
// Assert that this fails, we should have two copies of A now, one new one and one old one pending delete
assert.NotNil(t, res)
assert.NotNil(t, snap)
assert.Len(t, snap.Resources, 4)
assert.Equal(t, snap.Resources[1].Type, tokens.Type("pkgA:m:typA"))
assert.False(t, snap.Resources[1].Delete)
assert.Equal(t, snap.Resources[2].Type, tokens.Type("pkgA:m:typA"))
assert.True(t, snap.Resources[2].Delete)

// Now allow B to create and try again
failCreationOfTypB = false
snap, res = TestOp(Update).Run(project, p.GetTarget(t, snap), p.Options, false, p.BackendClient, nil)
assert.Nil(t, res)
assert.NotNil(t, snap)
assert.Len(t, snap.Resources, 3)
}
50 changes: 0 additions & 50 deletions pkg/resource/deploy/deployment_executor.go
Expand Up @@ -207,11 +207,6 @@ func (ex *deploymentExecutor) Execute(callerCtx context.Context, opts Options, p
// Set up a step generator for this deployment.
ex.stepGen = newStepGenerator(ex.deployment, opts, updateTargetsOpt, replaceTargetsOpt)

// Retire any pending deletes that are currently present in this deployment.
if res := ex.retirePendingDeletes(callerCtx, opts, preview); res != nil {
return nil, res
}

// Derive a cancellable context for this deployment. We will only cancel this context if some piece of the
// deployment's execution fails.
ctx, cancel := context.WithCancel(callerCtx)
Expand Down Expand Up @@ -459,51 +454,6 @@ func (ex *deploymentExecutor) handleSingleEvent(event SourceEvent) result.Result
return nil
}

// retirePendingDeletes deletes all resources that are pending deletion. Run before the start of a deployment, this pass
// ensures that the engine never sees any resources that are pending deletion from a previous deployment.
//
// retirePendingDeletes re-uses the deployment executor's step generator but uses its own step executor.
func (ex *deploymentExecutor) retirePendingDeletes(callerCtx context.Context, opts Options,
preview bool) result.Result {

contract.Require(ex.stepGen != nil, "ex.stepGen != nil")
steps := ex.stepGen.GeneratePendingDeletes()
if len(steps) == 0 {
logging.V(4).Infoln("deploymentExecutor.retirePendingDeletes(...): no pending deletions")
return nil
}

logging.V(4).Infof("deploymentExecutor.retirePendingDeletes(...): executing %d steps", len(steps))
ctx, cancel := context.WithCancel(callerCtx)

stepExec := newStepExecutor(ctx, cancel, ex.deployment, opts, preview, false)
antichains := ex.stepGen.ScheduleDeletes(steps)
// Submit the deletes for execution and wait for them all to retire.
for _, antichain := range antichains {
for _, step := range antichain {
ex.deployment.Ctx().StatusDiag.Infof(diag.RawMessage(step.URN(), "completing deletion from previous update"))
}

tok := stepExec.ExecuteParallel(antichain)
tok.Wait(ctx)
}

stepExec.SignalCompletion()
stepExec.WaitForCompletion()

// Like Refresh, we use the presence of an error in the caller's context to detect whether or not we have been
// cancelled.
canceled := callerCtx.Err() != nil
if stepExec.Errored() {
ex.reportExecResult("failed", preview)
return result.Bail()
} else if canceled {
ex.reportExecResult("canceled", preview)
return result.Bail()
}
return nil
}

// import imports a list of resources into a stack.
func (ex *deploymentExecutor) importResources(
callerCtx context.Context,
Expand Down
20 changes: 0 additions & 20 deletions pkg/resource/deploy/step_generator.go
Expand Up @@ -1228,26 +1228,6 @@ func (sg *stepGenerator) determineAllowedResourcesToDeleteFromTargets(
return resourcesToDelete, nil
}

// GeneratePendingDeletes generates delete steps for all resources that are pending deletion. This function should be
// called at the start of a deployment in order to find all resources that are pending deletion from the previous
// deployment.
func (sg *stepGenerator) GeneratePendingDeletes() []Step {
var dels []Step
if prev := sg.deployment.prev; prev != nil {
logging.V(7).Infof("stepGenerator.GeneratePendingDeletes(): scanning previous snapshot for pending deletes")
for i := len(prev.Resources) - 1; i >= 0; i-- {
res := prev.Resources[i]
if res.Delete {
logging.V(7).Infof(
"stepGenerator.GeneratePendingDeletes(): resource (%v, %v) is pending deletion", res.URN, res.ID)
sg.pendingDeletes[res] = true
dels = append(dels, NewDeleteStep(sg.deployment, res))
}
}
}
return dels
}

// ScheduleDeletes takes a list of steps that will delete resources and "schedules" them by producing a list of list of
// steps, where each list can be executed in parallel but a previous list must be executed to completion before
// advancing to the next list.
Expand Down

0 comments on commit 36cac60

Please sign in to comment.