Skip to content

Commit

Permalink
Improve NodeInfo RunId calculation to handle NodeIdLabel (#2087)
Browse files Browse the repository at this point in the history
* Improve NodeInfo RunId calculation to handle NodeIdLabel

The server now expects us to report RunIds on a node if either:
 - The job is on the node and not "finished"
   - The legacy scheduler considers any pod that is not using resource as finished
   - The ExecutorApi considers and pod that it has told us to delete as finished
 - The job is not assigned to a node but has a node selector matching the node (using NodeIdLabel to match)

* wip

* Use schedulerobjects JobRunState everywhere

* Fix tests

* Tidy up

* gofumt

* missed change

* Fix pr

---------

Co-authored-by: Albin Severinson <albin@severinson.org>
  • Loading branch information
JamesMurkin and severinson committed Feb 8, 2023
1 parent 2da3dae commit 99c89e3
Show file tree
Hide file tree
Showing 23 changed files with 961 additions and 452 deletions.
9 changes: 9 additions & 0 deletions client/DotNet/Armada.Client/ClientGenerated.cs
Expand Up @@ -1756,6 +1756,15 @@ public enum ApiJobState
[System.Runtime.Serialization.EnumMember(Value = @"RUNNING")]
RUNNING = 2,

[System.Runtime.Serialization.EnumMember(Value = @"SUCCEEDED")]
SUCCEEDED = 3,

[System.Runtime.Serialization.EnumMember(Value = @"FAILED")]
FAILED = 4,

[System.Runtime.Serialization.EnumMember(Value = @"UNKNOWN")]
UNKNOWN = 5,

}

[System.CodeDom.Compiler.GeneratedCode("NJsonSchema", "10.0.27.0 (Newtonsoft.Json v12.0.0.0)")]
Expand Down
6 changes: 6 additions & 0 deletions client/python/tests/unit/test_gen.py
Expand Up @@ -146,13 +146,19 @@ class JobState(Enum):
QUEUED = 0
PENDING = 1
RUNNING = 2
SUCCEEDED = 3
FAILED = 4
UNKNOWN = 5
'''

expected_jobstates = [
("QUEUED", 0),
("PENDING", 1),
("RUNNING", 2),
("SUCCEEDED", 3),
("FAILED", 4),
("UNKNOWN", 5),
]


Expand Down
1 change: 1 addition & 0 deletions config/executor/config.yaml
Expand Up @@ -31,6 +31,7 @@ kubernetes:
minimumAvailable: 2
QPS: 10000
Burst: 10000
nodeIdLabel: kubernetes.io/hostname
minimumPodAge: 3m
failedPodExpiry: 10m
maxTerminatedPods: 1000 # Should be lower than kube-controller-managed terminated-pod-gc-threshold (default 12500)
Expand Down
1 change: 1 addition & 0 deletions internal/executor/application.go
Expand Up @@ -144,6 +144,7 @@ func StartUpWithContext(
nodeInfoService,
usageClient,
config.Kubernetes.TrackedNodeLabels,
config.Kubernetes.NodeIdLabel,
config.Kubernetes.NodeReservedResources,
)

Expand Down
1 change: 1 addition & 0 deletions internal/executor/configuration/types.go
Expand Up @@ -42,6 +42,7 @@ type KubernetesConfiguration struct {
QPS float32
Burst int
Etcd EtcdConfiguration
NodeIdLabel string
TrackedNodeLabels []string
AvoidNodeLabelsOnRetry []string
ToleratedTaints []string
Expand Down
69 changes: 59 additions & 10 deletions internal/executor/utilisation/cluster_utilisation.go
Expand Up @@ -5,11 +5,9 @@ import (
"time"

"github.com/pkg/errors"

"k8s.io/apimachinery/pkg/api/resource"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/common"
armadaresource "github.com/armadaproject/armada/internal/common/resource"
Expand All @@ -32,6 +30,7 @@ type ClusterUtilisationService struct {
nodeInfoService node.NodeInfoService
usageClient api.UsageClient
trackedNodeLabels []string
nodeIdLabel string
nodeReservedResources armadaresource.ComputeResources
}

Expand All @@ -41,6 +40,7 @@ func NewClusterUtilisationService(
nodeInfoService node.NodeInfoService,
usageClient api.UsageClient,
trackedNodeLabels []string,
nodeIdLabel string,
nodeReservedResources armadaresource.ComputeResources,
) *ClusterUtilisationService {
return &ClusterUtilisationService{
Expand All @@ -49,6 +49,7 @@ func NewClusterUtilisationService(
nodeInfoService: nodeInfoService,
usageClient: usageClient,
trackedNodeLabels: trackedNodeLabels,
nodeIdLabel: nodeIdLabel,
nodeReservedResources: nodeReservedResources,
}
}
Expand Down Expand Up @@ -140,6 +141,7 @@ func (clusterUtilisationService *ClusterUtilisationService) GetAvailableClusterC
nodesUsage := getAllocatedResourceByNodeName(allNonCompletePodsRequiringResource)
podsByNodes := groupPodsByNodes(allNonCompletePodsRequiringResource)
nodes := make([]api.NodeInfo, 0, len(processingNodes))
runIdsByNode := clusterUtilisationService.getRunIdsByNode(processingNodes, allPods, useLegacyIds)
for _, n := range processingNodes {
allocatable := armadaresource.FromResourceList(n.Status.Allocatable)
available := allocatable.DeepCopy()
Expand All @@ -159,7 +161,7 @@ func (clusterUtilisationService *ClusterUtilisationService) GetAvailableClusterC
AvailableResources: available,
TotalResources: allocatable,
AllocatedResources: allocated,
RunIds: getRunIds(nodePods, useLegacyIds),
RunIdsByState: runIdsByNode[n.Name],
})
}

Expand All @@ -169,13 +171,60 @@ func (clusterUtilisationService *ClusterUtilisationService) GetAvailableClusterC
}, nil
}

// This is required until we transition to the Executor API
// The server api expects job ids whereas the executor api expects run ids
func getRunIds(pods []*v1.Pod, useLegacyIds bool) []string {
if useLegacyIds {
util.ExtractJobIds(pods)
// This returns all the pods assigned the node or soon to be assigned (via node-selector)
// The server api expects job ids, the executor api expects run ids - the legacy flag controls which this returns
func (clusterUtilisationService *ClusterUtilisationService) getRunIdsByNode(nodes []*v1.Node, pods []*v1.Pod, legacy bool) map[string]map[string]api.JobState {
nodeIdToNodeName := make(map[string]string, len(nodes))
for _, n := range nodes {
if nodeId, nodeIdPresent := n.Labels[clusterUtilisationService.nodeIdLabel]; nodeIdPresent {
nodeIdToNodeName[nodeId] = n.Name
}
}
noLongerNeedsReportingFunc := util.IsReportedDone

result := map[string]map[string]api.JobState{}
for _, pod := range pods {
// Skip pods that are not armada pods or "complete" from the servers point of view
if !util.IsManagedPod(pod) || noLongerNeedsReportingFunc(pod) {
continue
}
nodeIdNodeSelector, nodeSelectorPresent := pod.Spec.NodeSelector[clusterUtilisationService.nodeIdLabel]
runId := util.ExtractJobRunId(pod)
if legacy {
runId = util.ExtractJobId(pod)
}

nodeName := pod.Spec.NodeName
if nodeName == "" && nodeSelectorPresent {
targetedNodeName, present := nodeIdToNodeName[nodeIdNodeSelector]
// Not scheduled on a node, but has node selector matching the current node
if present {
nodeName = targetedNodeName
}
}

if nodeName != "" {
if _, present := result[nodeName]; !present {
result[nodeName] = map[string]api.JobState{}
}
result[nodeName][runId] = getJobRunState(pod)
}
}
return result
}

func getJobRunState(pod *v1.Pod) api.JobState {
switch {
case pod.Status.Phase == v1.PodPending:
return api.JobState_PENDING
case pod.Status.Phase == v1.PodRunning:
return api.JobState_RUNNING
case pod.Status.Phase == v1.PodSucceeded:
return api.JobState_SUCCEEDED
case pod.Status.Phase == v1.PodFailed:
return api.JobState_FAILED
}
return util.ExtractJobRunIds(pods)
return api.JobState_UNKNOWN
}

func getAllocatedResourceByNodeName(pods []*v1.Pod) map[string]armadaresource.ComputeResources {
Expand Down
105 changes: 105 additions & 0 deletions internal/executor/utilisation/cluster_utilisation_test.go
Expand Up @@ -11,8 +11,11 @@ import (
armadaresource "github.com/armadaproject/armada/internal/common/resource"
util2 "github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/executor/domain"
"github.com/armadaproject/armada/pkg/api"
)

const nodeIdLabel = "node-id"

func TestCreateReportsOfQueueUsages(t *testing.T) {
utilisationService := &ClusterUtilisationService{
queueUtilisationService: NewPodUtilisationService(nil, nil, nil, nil),
Expand Down Expand Up @@ -304,6 +307,108 @@ func TestGetAllocationByQueueAndPriority_AggregatesResources(t *testing.T) {
assert.Equal(t, result, expectedResult)
}

func TestGetRunIdsByNode(t *testing.T) {
utilisationService := &ClusterUtilisationService{
nodeIdLabel: nodeIdLabel,
}
node1 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1", Labels: map[string]string{nodeIdLabel: "node-1-id"}}}
node2 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-2", Labels: map[string]string{nodeIdLabel: "node-2-id"}}}

tests := map[string]struct {
inputPods []*v1.Pod
legacyIds bool
expectedOutput map[string]map[string]api.JobState
}{
"MatchesOnNodeName": {
inputPods: []*v1.Pod{
createPodOnNode("job-1", "run-1", v1.PodRunning, "node-1", ""),
createPodOnNode("job-2", "run-2", v1.PodRunning, "node-1", ""),
createPodOnNode("job-3", "run-3", v1.PodRunning, "node-2", ""),
},
expectedOutput: map[string]map[string]api.JobState{
"node-1": {"run-1": api.JobState_RUNNING, "run-2": api.JobState_RUNNING},
"node-2": {"run-3": api.JobState_RUNNING},
},
},
"LegacyGivesJobIds": {
inputPods: []*v1.Pod{
createPodOnNode("job-1", "run-1", v1.PodRunning, "node-1", ""),
createPodOnNode("job-2", "run-2", v1.PodRunning, "node-2", ""),
},
expectedOutput: map[string]map[string]api.JobState{
"node-1": {"job-1": api.JobState_RUNNING},
"node-2": {"job-2": api.JobState_RUNNING},
},
legacyIds: true,
},
"HandlesAllPodPhases": {
inputPods: []*v1.Pod{
createPodOnNode("job-1", "run-1", v1.PodPending, "node-1", ""),
createPodOnNode("job-2", "run-2", v1.PodRunning, "node-1", ""),
createPodOnNode("job-3", "run-3", v1.PodSucceeded, "node-1", ""),
createPodOnNode("job-4", "run-4", v1.PodFailed, "node-1", ""),
},
expectedOutput: map[string]map[string]api.JobState{
"node-1": {"run-1": api.JobState_PENDING, "run-2": api.JobState_RUNNING, "run-3": api.JobState_SUCCEEDED, "run-4": api.JobState_FAILED},
},
},
"PodWithNodeSelectorTargetingNode": {
inputPods: []*v1.Pod{
// Node selector matches node-1 label
createPodOnNode("job-1", "run-1", v1.PodPending, "", "node-1-id"),
},
expectedOutput: map[string]map[string]api.JobState{
"node-1": {"run-1": api.JobState_PENDING},
},
},
"PodWithNodeSelectorTargetingInvalidNode": {
inputPods: []*v1.Pod{
// Node selector does not match any node
createPodOnNode("job-1", "run-1", v1.PodPending, "", "node-3-id"),
},
// No matches
expectedOutput: map[string]map[string]api.JobState{},
},
"Mixed": {
inputPods: []*v1.Pod{
createPodOnNode("job-1", "run-1", v1.PodRunning, "node-1", ""),
createPodOnNode("job-2", "run-2", v1.PodPending, "", "node-1-id"),
},
expectedOutput: map[string]map[string]api.JobState{
"node-1": {"run-1": api.JobState_RUNNING, "run-2": api.JobState_PENDING},
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
result := utilisationService.getRunIdsByNode([]*v1.Node{node1, node2}, tc.inputPods, tc.legacyIds)
assert.Equal(t, tc.expectedOutput, result)
})
}
}

func createPodOnNode(jobId string, runId string, phase v1.PodPhase, nodeName string, nodeIdSelector string) *v1.Pod {
pod := &v1.Pod{
Status: v1.PodStatus{
Phase: phase,
},
Spec: v1.PodSpec{
NodeName: nodeName,
},
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
domain.JobId: jobId,
domain.JobRunId: runId,
},
},
}
if nodeIdSelector != "" {
pod.Spec.NodeSelector = map[string]string{nodeIdLabel: nodeIdSelector}
}
return pod
}

func TestGetAllocationByQueueAndPriority_HandlesEmptyList(t *testing.T) {
var pods []*v1.Pod

Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/api.go
Expand Up @@ -206,7 +206,7 @@ func extractRunIds(req *executorapi.LeaseRequest) ([]uuid.UUID, error) {
runIds := make([]uuid.UUID, 0)
// add all runids from nodes
for _, node := range req.Nodes {
for _, runIdStr := range node.RunIds {
for runIdStr := range node.RunIdsByState {
runId, err := uuid.Parse(runIdStr)
if err != nil {
return nil, errors.WithStack(err)
Expand Down
11 changes: 7 additions & 4 deletions internal/scheduler/api_test.go
Expand Up @@ -38,8 +38,11 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
Pool: "test-pool",
Nodes: []*api.NodeInfo{
{
Name: "test-node",
RunIds: []string{runId1.String(), runId2.String()},
Name: "test-node",
RunIdsByState: map[string]api.JobState{
runId1.String(): api.JobState_RUNNING,
runId2.String(): api.JobState_RUNNING,
},
},
},
UnassignedJobRunIds: []armadaevents.Uuid{*armadaevents.ProtoUuidFromUuid(runId3)},
Expand All @@ -51,7 +54,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
{
Id: "test-executor-test-node",
TotalResources: schedulerobjects.ResourceList{},
JobRuns: []string{runId1.String(), runId2.String()},
JobRunsByState: map[string]schedulerobjects.JobRunState{runId1.String(): schedulerobjects.JobRunState_RUNNING, runId2.String(): schedulerobjects.JobRunState_RUNNING},
AllocatableByPriorityAndResource: map[int32]schedulerobjects.ResourceList{
1000: {
Resources: map[string]resource.Quantity{},
Expand Down Expand Up @@ -144,7 +147,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
assert.Equal(t, tc.expectedExecutor, executor)
return nil
}).Times(1)
mockJobRepository.EXPECT().FindInactiveRuns(gomock.Any(), runIds).Return(tc.runsToCancel, nil).Times(1)
mockJobRepository.EXPECT().FindInactiveRuns(gomock.Any(), schedulermocks.SliceMatcher[uuid.UUID]{Expected: runIds}).Return(tc.runsToCancel, nil).Times(1)
mockJobRepository.EXPECT().FetchJobRunLeases(gomock.Any(), tc.request.ExecutorId, maxJobsPerCall, runIds).Return(tc.leases, nil).Times(1)

// capture all sent messages
Expand Down
45 changes: 45 additions & 0 deletions internal/scheduler/mocks/matchers.go
@@ -0,0 +1,45 @@
package schedulermocks

import (
"sort"
"strings"
)

type Stringer interface {
String() string
}

type SliceMatcher[T Stringer] struct {
Expected []T
}

// Matches
// Matches input against provided expected input
// This matching ignores the input ordering, so args don't need to be passed in a known order
func (s SliceMatcher[T]) Matches(x interface{}) bool {
inputs, ok := x.([]T)
if !ok {
return false
}
expected := s.Expected
if len(inputs) != len(expected) {
return false
}
sort.Slice(inputs, func(i, j int) bool {
return strings.Compare(inputs[i].String(), inputs[j].String()) < 0
})
sort.Slice(expected, func(i, j int) bool {
return strings.Compare(expected[i].String(), expected[j].String()) < 0
})
for i, inputValue := range inputs {
if inputValue.String() != expected[i].String() {
return false
}
}
return true
}

// String describes what the matcher matches.
func (s SliceMatcher[T]) String() string {
return "checks provided matches expected uuid list"
}
2 changes: 1 addition & 1 deletion internal/scheduler/schedulerobjects/executor.go
Expand Up @@ -9,7 +9,7 @@ func (m *Executor) AllRuns() ([]uuid.UUID, error) {
runIds := make([]uuid.UUID, 0)
// add all runids from nodes
for _, node := range m.Nodes {
for _, runIdStr := range node.JobRuns {
for runIdStr := range node.JobRunsByState {
runId, err := uuid.Parse(runIdStr)
if err != nil {
return nil, errors.WithStack(err)
Expand Down

0 comments on commit 99c89e3

Please sign in to comment.