Skip to content

Commit

Permalink
Fix leaking goroutines in Install
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Słapek <28485371+mslapek@users.noreply.github.com>
  • Loading branch information
mslapek committed Sep 18, 2023
1 parent 5f2cd25 commit 169561a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 50 deletions.
93 changes: 43 additions & 50 deletions pkg/action/install.go
Expand Up @@ -219,6 +219,9 @@ func (i *Install) Run(chrt *chart.Chart, vals map[string]interface{}) (*release.
}

// Run executes the installation with Context
//
// When the task is cancelled through ctx, the function returns and the install
// proceeds in the background.
func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals map[string]interface{}) (*release.Release, error) {
// Check reachability of cluster unless in client-only mode (e.g. `helm template` without `--validate`)
if !i.ClientOnly {
Expand Down Expand Up @@ -385,17 +388,31 @@ func (i *Install) RunWithContext(ctx context.Context, chrt *chart.Chart, vals ma
// not working.
return rel, err
}
rChan := make(chan resultMessage)
ctxChan := make(chan resultMessage)
doneChan := make(chan struct{})
defer close(doneChan)
go i.performInstall(rChan, rel, toBeAdopted, resources)
go i.handleContext(ctx, ctxChan, doneChan, rel)

rel, err = i.performInstallCtx(ctx, rel, toBeAdopted, resources)
if err != nil {
rel, err = i.failRelease(rel, err)
}
return rel, err
}

func (i *Install) performInstallCtx(ctx context.Context, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) {
type Msg struct {
r *release.Release
e error
}
resultChan := make(chan Msg, 1)

go func() {
rel, err := i.performInstall(rel, toBeAdopted, resources)
resultChan <- Msg{rel, err}
}()
select {
case result := <-rChan:
return result.r, result.e
case result := <-ctxChan:
return result.r, result.e
case <-ctx.Done():
err := ctx.Err()
return rel, err
case msg := <-resultChan:
return msg.r, msg.e
}
}

Expand All @@ -407,49 +424,41 @@ func (i *Install) isDryRun() bool {
return false
}

func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) {

func (i *Install) performInstall(rel *release.Release, toBeAdopted kube.ResourceList, resources kube.ResourceList) (*release.Release, error) {
var err error
// pre-install hooks
if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPreInstall, i.Timeout); err != nil {
i.reportToRun(c, rel, fmt.Errorf("failed pre-install: %s", err))
return
return rel, fmt.Errorf("failed pre-install: %s", err)
}
}

// At this point, we can do the install. Note that before we were detecting whether to
// do an update, but it's not clear whether we WANT to do an update if the re-use is set
// to true, since that is basically an upgrade operation.
if len(toBeAdopted) == 0 && len(resources) > 0 {
if _, err := i.cfg.KubeClient.Create(resources); err != nil {
i.reportToRun(c, rel, err)
return
}
_, err = i.cfg.KubeClient.Create(resources)
} else if len(resources) > 0 {
if _, err := i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force); err != nil {
i.reportToRun(c, rel, err)
return
}
_, err = i.cfg.KubeClient.Update(toBeAdopted, resources, i.Force)
}
if err != nil {
return rel, err
}

if i.Wait {
if i.WaitForJobs {
if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil {
i.reportToRun(c, rel, err)
return
}
err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout)
} else {
if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
i.reportToRun(c, rel, err)
return
}
err = i.cfg.KubeClient.Wait(resources, i.Timeout)
}
if err != nil {
return rel, err
}
}

if !i.DisableHooks {
if err := i.cfg.execHook(rel, release.HookPostInstall, i.Timeout); err != nil {
i.reportToRun(c, rel, fmt.Errorf("failed post-install: %s", err))
return
return rel, fmt.Errorf("failed post-install: %s", err)
}
}

Expand All @@ -470,25 +479,9 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
i.cfg.Log("failed to record the release: %s", err)
}

i.reportToRun(c, rel, nil)
}
func (i *Install) handleContext(ctx context.Context, c chan<- resultMessage, done chan struct{}, rel *release.Release) {
select {
case <-ctx.Done():
err := ctx.Err()
i.reportToRun(c, rel, err)
case <-done:
return
}
}
func (i *Install) reportToRun(c chan<- resultMessage, rel *release.Release, err error) {
i.Lock.Lock()
if err != nil {
rel, err = i.failRelease(rel, err)
}
c <- resultMessage{r: rel, e: err}
i.Lock.Unlock()
return rel, nil
}

func (i *Install) failRelease(rel *release.Release, err error) (*release.Release, error) {
rel.SetStatus(release.StatusFailed, fmt.Sprintf("Release %q failed: %s", i.ReleaseName, err.Error()))
if i.Atomic {
Expand Down
11 changes: 11 additions & 0 deletions pkg/action/install_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -369,10 +370,14 @@ func TestInstallRelease_Wait(t *testing.T) {
instAction.Wait = true
vals := map[string]interface{}{}

goroutines := runtime.NumGoroutine()

res, err := instAction.Run(buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)

is.Equal(goroutines, runtime.NumGoroutine())
}
func TestInstallRelease_Wait_Interrupted(t *testing.T) {
is := assert.New(t)
Expand All @@ -388,10 +393,16 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
time.AfterFunc(time.Second, cancel)

goroutines := runtime.NumGoroutine()

res, err := instAction.RunWithContext(ctx, buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed)

is.Equal(goroutines+1, runtime.NumGoroutine()) // installation goroutine still is in background
time.Sleep(10 * time.Second) // wait for goroutine to finish
is.Equal(goroutines, runtime.NumGoroutine())
}
func TestInstallRelease_WaitForJobs(t *testing.T) {
is := assert.New(t)
Expand Down

0 comments on commit 169561a

Please sign in to comment.