Skip to content

Commit

Permalink
[gh-6980] Client: clean up old allocs before running new ones using t…
Browse files Browse the repository at this point in the history
…he `exec` task driver. (#20500)

Whenever the "exec" task driver is being used, nomad runs a plug in that in time runs the task on a container under the hood. If by any circumstance the executor is killed, the task is reparented to the init service and wont be stopped by Nomad in case of a job updated or stop.

This commit introduces two mechanisms to avoid this behaviour:

* Adds signal catching and handling to the executor, so in case of a SIGTERM, the signal will also be passed on to the task.
* Adds a pre start clean up of the processes in the container, ensuring only the ones the executor runs are present at any given time.
  • Loading branch information
Juanadelacuesta committed May 14, 2024
1 parent 7b2343a commit 480d6b0
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .changelog/20500.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: terminate old exec task processes before starting new ones, to avoid accidentally leaving running processes in case of an error
```
59 changes: 58 additions & 1 deletion drivers/shared/executor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"strings"
Expand Down Expand Up @@ -68,19 +69,73 @@ type LibcontainerExecutor struct {
userProc *libcontainer.Process
userProcExited chan interface{}
exitState *ProcessState
sigChan chan os.Signal
}

func (l *LibcontainerExecutor) catchSignals() {
l.logger.Trace("waiting for signals")
defer signal.Stop(l.sigChan)
defer close(l.sigChan)

signal.Notify(l.sigChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSEGV)
for {
signal := <-l.sigChan
if signal == syscall.SIGTERM || signal == syscall.SIGINT {
l.Shutdown("SIGINT", 0)
break
}

if l.container != nil {
l.container.Signal(signal, false)
}
}
}

func NewExecutorWithIsolation(logger hclog.Logger, cpuTotalTicks uint64) Executor {
logger = logger.Named("isolated_executor")
stats.SetCpuTotalTicks(cpuTotalTicks)

return &LibcontainerExecutor{
sigch := make(chan os.Signal, 4)

le := LibcontainerExecutor{
id: strings.ReplaceAll(uuid.Generate(), "-", "_"),
logger: logger,
totalCpuStats: stats.NewCpuStats(),
userCpuStats: stats.NewCpuStats(),
systemCpuStats: stats.NewCpuStats(),
pidCollector: newPidCollector(logger),
sigChan: sigch,
}

go le.catchSignals()
return le

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / test-e2e

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / tests-groups (nomad)

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / tests-api

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / tests-groups (client)

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / tests-groups (command)

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / tests-groups (drivers)

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / tests-groups (quick)

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / checks / checks

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver) (typecheck)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / checks / checks

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)) (typecheck)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / checks / checks

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)) (typecheck)

Check failure on line 111 in drivers/shared/executor/executor_linux.go

View workflow job for this annotation

GitHub Actions / checks / checks

cannot use le (variable of type LibcontainerExecutor) as Executor value in return statement: LibcontainerExecutor does not implement Executor (method Exec has pointer receiver)) (typecheck)
}

// cleanOldProcessesInCGroup kills processes that might ended up orphans when the
// executor was unexpectedly killed and nomad can't reconnect to them.
func (l *LibcontainerExecutor) cleanOldProcessesInCGroup(nomadRelativePath string) {
l.logger.Debug("looking for old processes", "path", nomadRelativePath)

root := cgutil.CgroupRoot
orphansPIDs, err := cgroups.GetAllPids(filepath.Join(root, nomadRelativePath))
if err != nil {
l.logger.Error("unable to get orphaned task PIDs", "error", err)
return
}

for _, pid := range orphansPIDs {
l.logger.Info("killing orphaned process", "pid", pid)

// Avoid bringing down the whole node by mistake, very unlikely case,
// but it's better to be sure.
if pid == 1 {
continue
}

err := syscall.Kill(pid, syscall.SIGKILL)
if err != nil {
l.logger.Error("unable to send signal to process", "pid", pid, "error", err)
}
}
}

Expand Down Expand Up @@ -114,6 +169,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
return nil, fmt.Errorf("failed to configure container(%s): %v", l.id, err)
}

l.cleanOldProcessesInCGroup(containerCfg.Cgroups.Path)
container, err := factory.Create(l.id, containerCfg)
if err != nil {
return nil, fmt.Errorf("failed to create container(%s): %v", l.id, err)
Expand Down Expand Up @@ -153,6 +209,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
if command.User != "" {
process.User = command.User
}

l.userProc = process

l.totalCpuStats = stats.NewCpuStats()
Expand Down
117 changes: 117 additions & 0 deletions drivers/shared/executor/executor_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
"testing"
"time"

Expand All @@ -27,6 +29,7 @@ import (
"github.com/opencontainers/runc/libcontainer/cgroups"
lconfigs "github.com/opencontainers/runc/libcontainer/configs"
"github.com/opencontainers/runc/libcontainer/devices"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -886,3 +889,117 @@ func TestUniversalExecutor_NoCgroup(t *testing.T) {
})

}

func createCGroup(fullpath string) (cgroupslib.Interface, error) {
if err := os.MkdirAll(fullpath, 0755); err != nil {
return nil, err
}

return cgroupslib.OpenPath(fullpath), nil
}

func TestExecutor_CleanOldProcessesInCGroup(t *testing.T) {
ci.Parallel(t)

testutil.ExecCompatible(t)
testutil.CgroupsCompatible(t)

testExecCmd := testExecutorCommandWithChroot(t)

allocDir := testExecCmd.allocDir
defer allocDir.Destroy()

fullCGroupPath := testExecCmd.command.Resources.LinuxResources.CpusetCgroupPath

execCmd := testExecCmd.command
execCmd.Cmd = "/bin/sleep"
execCmd.Args = []string{"1"}
execCmd.ResourceLimits = true
execCmd.ModePID = "private"
execCmd.ModeIPC = "private"

// Create the CGroup the executor's command will run in and populate it with one process
cgInterface, err := createCGroup(fullCGroupPath)
must.NoError(t, err)

cmd := exec.Command("/bin/sleep", "3000")
err = cmd.Start()
must.NoError(t, err)

go func() {
err := cmd.Wait()
//This process will be killed by the executor as a prerequisite to run
// the executors command.
must.Error(t, err)
}()

pid := cmd.Process.Pid
must.Positive(t, pid)

err = cgInterface.Write("cgroup.procs", strconv.Itoa(pid))
must.NoError(t, err)

pids, err := cgInterface.PIDs()
must.NoError(t, err)
must.One(t, pids.Size())

// Run the executor normally and make sure the process that was originally running
// as part of the CGroup was killed, and only the executor's process is running.
execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute)
executor := execInterface.(*LibcontainerExecutor)
defer executor.Shutdown("SIGKILL", 0)

ps, err := executor.Launch(execCmd)
must.NoError(t, err)
must.Positive(t, ps.Pid)

pids, err = cgInterface.PIDs()
must.NoError(t, err)
must.One(t, pids.Size())
must.True(t, pids.Contains(ps.Pid))
must.False(t, pids.Contains(pid))

estate, err := executor.Wait(context.Background())
must.NoError(t, err)
must.Zero(t, estate.ExitCode)

must.NoError(t, executor.Shutdown("", 0))
executor.Wait(context.Background())
}

func TestExecutor_SignalCatching(t *testing.T) {
ci.Parallel(t)

testutil.ExecCompatible(t)
testutil.CgroupsCompatible(t)

testExecCmd := testExecutorCommandWithChroot(t)

allocDir := testExecCmd.allocDir
defer allocDir.Destroy()

execCmd := testExecCmd.command
execCmd.Cmd = "/bin/sleep"
execCmd.Args = []string{"100"}
execCmd.ResourceLimits = true
execCmd.ModePID = "private"
execCmd.ModeIPC = "private"

execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute)

ps, err := execInterface.Launch(execCmd)
must.NoError(t, err)
must.Positive(t, ps.Pid)

executor := execInterface.(*LibcontainerExecutor)
status, err := executor.container.OCIState()
must.NoError(t, err)
must.Eq(t, specs.StateRunning, status.Status)

executor.sigChan <- syscall.SIGTERM
time.Sleep(1 * time.Second)

status, err = executor.container.OCIState()
must.NoError(t, err)
must.Eq(t, specs.StateStopped, status.Status)
}

0 comments on commit 480d6b0

Please sign in to comment.