Skip to content

Commit

Permalink
[gh-6980] Client: clean up old allocs before running new ones using t…
Browse files Browse the repository at this point in the history
…he `exec` task driver. (#20500) (#20584)

Whenever the "exec" task driver is being used, nomad runs a plug in that in time runs the task on a container under the hood. If by any circumstance the executor is killed, the task is reparented to the init service and wont be stopped by Nomad in case of a job updated or stop.

This commit introduces two mechanisms to avoid this behaviour:

* Adds signal catching and handling to the executor, so in case of a SIGTERM, the signal will also be passed on to the task.
* Adds a pre start clean up of the processes in the container, ensuring only the ones the executor runs are present at any given time.

Co-authored-by: Juana De La Cuesta <juanita.delacuestamorales@hashicorp.com>
  • Loading branch information
hc-github-team-nomad-core and Juanadelacuesta committed May 15, 2024
1 parent 3b11930 commit 8601900
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .changelog/20500.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: terminate old exec task processes before starting new ones, to avoid accidentally leaving running processes in case of an error
```
4 changes: 4 additions & 0 deletions client/lib/cgroupslib/editor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
root = "/sys/fs/cgroup"
)

func GetDefaultRoot() string {
return root
}

// OpenPath creates a handle for modifying cgroup interface files under
// the given directory.
//
Expand Down
59 changes: 58 additions & 1 deletion drivers/shared/executor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"strings"
Expand Down Expand Up @@ -78,17 +79,43 @@ type LibcontainerExecutor struct {
userProc *libcontainer.Process
userProcExited chan interface{}
exitState *ProcessState
sigChan chan os.Signal
}

func (l *LibcontainerExecutor) catchSignals() {
l.logger.Trace("waiting for signals")
defer signal.Stop(l.sigChan)
defer close(l.sigChan)

signal.Notify(l.sigChan, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSEGV)
for {
signal := <-l.sigChan
if signal == syscall.SIGTERM || signal == syscall.SIGINT {
l.Shutdown("SIGINT", 0)
break
}

if l.container != nil {
l.container.Signal(signal, false)
}
}
}

func NewExecutorWithIsolation(logger hclog.Logger, compute cpustats.Compute) Executor {
sigch := make(chan os.Signal, 4)

le := &LibcontainerExecutor{
id: strings.ReplaceAll(uuid.Generate(), "-", "_"),
logger: logger.Named("isolated_executor"),
compute: compute,
totalCpuStats: cpustats.New(compute),
userCpuStats: cpustats.New(compute),
systemCpuStats: cpustats.New(compute),
sigChan: sigch,
}

go le.catchSignals()

le.processStats = procstats.New(compute, le)
return le
}
Expand All @@ -97,6 +124,34 @@ func (l *LibcontainerExecutor) ListProcesses() *set.Set[int] {
return procstats.List(l.command)
}

// cleanOldProcessesInCGroup kills processes that might ended up orphans when the
// executor was unexpectedly killed and nomad can't reconnect to them.
func (l *LibcontainerExecutor) cleanOldProcessesInCGroup(nomadRelativePath string) {
l.logger.Debug("looking for old processes", "path", nomadRelativePath)

root := cgroupslib.GetDefaultRoot()
orphansPIDs, err := cgroups.GetAllPids(filepath.Join(root, nomadRelativePath))
if err != nil {
l.logger.Error("unable to get orphaned task PIDs", "error", err)
return
}

for _, pid := range orphansPIDs {
l.logger.Info("killing orphaned process", "pid", pid)

// Avoid bringing down the whole node by mistake, very unlikely case,
// but it's better to be sure.
if pid == 1 {
continue
}

err := syscall.Kill(pid, syscall.SIGKILL)
if err != nil {
l.logger.Error("unable to send signal to process", "pid", pid, "error", err)
}
}
}

// Launch creates a new container in libcontainer and starts a new process with it
func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) {
l.logger.Trace("preparing to launch command", "command", command.Cmd, "args", strings.Join(command.Args, " "))
Expand Down Expand Up @@ -127,6 +182,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
return nil, fmt.Errorf("failed to configure container(%s): %v", l.id, err)
}

l.cleanOldProcessesInCGroup(containerCfg.Cgroups.Path)
container, err := factory.Create(l.id, containerCfg)
if err != nil {
return nil, fmt.Errorf("failed to create container(%s): %v", l.id, err)
Expand Down Expand Up @@ -166,6 +222,7 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
if command.User != "" {
process.User = command.User
}

l.userProc = process

l.totalCpuStats = cpustats.New(l.compute)
Expand All @@ -187,7 +244,6 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
// start a goroutine to wait on the process to complete, so Wait calls can
// be multiplexed
l.userProcExited = make(chan interface{})

go l.wait()

return &ProcessState{
Expand Down Expand Up @@ -779,6 +835,7 @@ func (l *LibcontainerExecutor) configureCG2(cfg *runc.Config, command *ExecComma

func (l *LibcontainerExecutor) newLibcontainerConfig(command *ExecCommand) (*runc.Config, error) {
cfg := &runc.Config{
ParentDeathSignal: 9,
Cgroups: &runc.Cgroup{
Resources: &runc.Resources{
MemorySwappiness: nil,
Expand Down
117 changes: 117 additions & 0 deletions drivers/shared/executor/executor_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
"testing"
"time"

Expand All @@ -26,6 +28,7 @@ import (
tu "github.com/hashicorp/nomad/testutil"
lconfigs "github.com/opencontainers/runc/libcontainer/configs"
"github.com/opencontainers/runc/libcontainer/devices"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -792,3 +795,117 @@ func TestExecutor_cmdMounts(t *testing.T) {

require.EqualValues(t, expected, cmdMounts(input))
}

func createCGroup(fullpath string) (cgroupslib.Interface, error) {
if err := os.MkdirAll(fullpath, 0755); err != nil {
return nil, err
}

return cgroupslib.OpenPath(fullpath), nil
}

func TestExecutor_CleanOldProcessesInCGroup(t *testing.T) {
ci.Parallel(t)

testutil.ExecCompatible(t)
testutil.CgroupsCompatible(t)

testExecCmd := testExecutorCommandWithChroot(t)

allocDir := testExecCmd.allocDir
defer allocDir.Destroy()

fullCGroupPath := testExecCmd.command.Resources.LinuxResources.CpusetCgroupPath

execCmd := testExecCmd.command
execCmd.Cmd = "/bin/sleep"
execCmd.Args = []string{"1"}
execCmd.ResourceLimits = true
execCmd.ModePID = "private"
execCmd.ModeIPC = "private"

// Create the CGroup the executor's command will run in and populate it with one process
cgInterface, err := createCGroup(fullCGroupPath)
must.NoError(t, err)

cmd := exec.Command("/bin/sleep", "3000")
err = cmd.Start()
must.NoError(t, err)

go func() {
err := cmd.Wait()
//This process will be killed by the executor as a prerequisite to run
// the executors command.
must.Error(t, err)
}()

pid := cmd.Process.Pid
must.Positive(t, pid)

err = cgInterface.Write("cgroup.procs", strconv.Itoa(pid))
must.NoError(t, err)

pids, err := cgInterface.PIDs()
must.NoError(t, err)
must.One(t, pids.Size())

// Run the executor normally and make sure the process that was originally running
// as part of the CGroup was killed, and only the executor's process is running.
execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute)
executor := execInterface.(*LibcontainerExecutor)
defer executor.Shutdown("SIGKILL", 0)

ps, err := executor.Launch(execCmd)
must.NoError(t, err)
must.Positive(t, ps.Pid)

pids, err = cgInterface.PIDs()
must.NoError(t, err)
must.One(t, pids.Size())
must.True(t, pids.Contains(ps.Pid))
must.False(t, pids.Contains(pid))

estate, err := executor.Wait(context.Background())
must.NoError(t, err)
must.Zero(t, estate.ExitCode)

must.NoError(t, executor.Shutdown("", 0))
executor.Wait(context.Background())
}

func TestExecutor_SignalCatching(t *testing.T) {
ci.Parallel(t)

testutil.ExecCompatible(t)
testutil.CgroupsCompatible(t)

testExecCmd := testExecutorCommandWithChroot(t)

allocDir := testExecCmd.allocDir
defer allocDir.Destroy()

execCmd := testExecCmd.command
execCmd.Cmd = "/bin/sleep"
execCmd.Args = []string{"100"}
execCmd.ResourceLimits = true
execCmd.ModePID = "private"
execCmd.ModeIPC = "private"

execInterface := NewExecutorWithIsolation(testlog.HCLogger(t), compute)

ps, err := execInterface.Launch(execCmd)
must.NoError(t, err)
must.Positive(t, ps.Pid)

executor := execInterface.(*LibcontainerExecutor)
status, err := executor.container.OCIState()
must.NoError(t, err)
must.Eq(t, specs.StateRunning, status.Status)

executor.sigChan <- syscall.SIGTERM
time.Sleep(1 * time.Second)

status, err = executor.container.OCIState()
must.NoError(t, err)
must.Eq(t, specs.StateStopped, status.Status)
}

0 comments on commit 8601900

Please sign in to comment.