Skip to content

Commit

Permalink
Scheduler plumbing necessary for preemption to fair share (#2059)
Browse files Browse the repository at this point in the history
* Update

* Refactoring

* Update

* Remove unnecessary generics

* Fix tests

* Refactor

* Fix tests

* Fix tests

* Address comments

* Fix tests

* Add MultiJobsIterator

* Store node name by run id

* Add NodeIdLabel config

* Add nodeIdLabel config and optimisations

* Lint

* Fix tests

* Avoid panic

* Track nodeIdLabel

* Add job ids to scheduler node

* Avoid panic

* Test improvements

* Add targetNodeIdAnnotation

* Make targetNodeIdAnnotation a const

* Cleanup

* Add JobIdAnnotation const

* Cleanup

* Fix tests

* Add missing annotation

* Lint
  • Loading branch information
severinson committed Jan 25, 2023
1 parent 6b27ffe commit 5e30e3f
Show file tree
Hide file tree
Showing 21 changed files with 1,188 additions and 1,082 deletions.
2 changes: 1 addition & 1 deletion config/executor/config.yaml
Expand Up @@ -16,7 +16,7 @@ task:
utilisationEventProcessingInterval: 1s
utilisationEventReportingInterval: 5m
apiConnection:
armadaUrl : "server:50051"
armadaUrl: "server:50051"
client:
maxMessageSizeBytes: 8388608 # 1024 * 1024 * 8
metric:
Expand Down
6 changes: 3 additions & 3 deletions internal/armada/repository/job_test.go
Expand Up @@ -618,18 +618,18 @@ func TestUpdateJobs_SingleJobThatExists_ChangesJob(t *testing.T) {
newSchedName := "custom"

results, err := r.UpdateJobs([]string{job1.Id}, func(jobs []*api.Job) {
assert.Equal(t, 1, len(jobs))
require.Equal(t, 1, len(jobs))
jobs[0].PodSpec.SchedulerName = newSchedName
})
require.NoError(t, err)

assert.Equal(t, 1, len(results))
require.Equal(t, 1, len(results))
assert.Nil(t, results[0].Error)
assert.Equal(t, job1.Id, results[0].JobId)

reloadedJobs, err := r.GetExistingJobsByIds([]string{job1.Id})
require.NoError(t, err)
assert.Equal(t, 1, len(reloadedJobs))
require.Equal(t, 1, len(reloadedJobs))
assert.Equal(t, newSchedName, reloadedJobs[0].PodSpec.SchedulerName)
assert.Equal(t, results[0].Job, reloadedJobs[0])
})
Expand Down
13 changes: 10 additions & 3 deletions internal/armada/server/lease.go
Expand Up @@ -278,14 +278,21 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
}

// Nodes to be considered by the scheduler.
nodes := make([]*schedulerobjects.Node, len(req.Nodes))
for i, nodeInfo := range req.Nodes {
nodes[i] = api.NewNodeFromNodeInfo(
nodes := make([]*schedulerobjects.Node, 0, len(req.Nodes))
for _, nodeInfo := range req.Nodes {
node, err := api.NewNodeFromNodeInfo(
&nodeInfo,
req.ClusterId,
priorities,
time.Now(),
)
if err != nil {
logging.WithStacktrace(log, err).Warnf(
"skipping node %s from executor %s", nodeInfo.GetName(), req.GetClusterId(),
)
} else {
nodes = append(nodes, node)
}
}
indexedResources := q.schedulingConfig.IndexedResources
if len(indexedResources) == 0 {
Expand Down
3 changes: 2 additions & 1 deletion internal/scheduler/adapters/adapters.go
Expand Up @@ -2,6 +2,7 @@ package adapters

import (
log "github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"

"github.com/armadaproject/armada/internal/armada/configuration"
Expand All @@ -11,7 +12,7 @@ import (

func PodRequirementsFromPod(pod *v1.Pod, priorityByPriorityClassName map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements {
rv := PodRequirementsFromPodSpec(&pod.Spec, priorityByPriorityClassName)
rv.Annotations = pod.Annotations
rv.Annotations = maps.Clone(pod.Annotations)
return rv
}

Expand Down
25 changes: 17 additions & 8 deletions internal/scheduler/api.go
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/database"
Expand Down Expand Up @@ -55,7 +56,8 @@ func NewExecutorApi(producer pulsar.Producer,
// - Determines if any of the job runs in the request are no longer active and should be cancelled
// - Determines if any new job runs should be leased to the executor
func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRunsServer) error {
log := ctxlogrus.Extract(stream.Context())
ctx := stream.Context()
log := ctxlogrus.Extract(ctx)
// Receive once to get info necessary to get jobs to lease.
req, err := stream.Recv()
if err != nil {
Expand All @@ -65,9 +67,8 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
log.Infof("Handling lease request for executor %s", req.ExecutorId)

// store the executor state for use by the scheduler
executorState := srv.createExecutorState(req)
err = srv.executorRepository.StoreExecutor(stream.Context(), executorState)
if err != nil {
executorState := srv.createExecutorState(ctx, req)
if err = srv.executorRepository.StoreExecutor(stream.Context(), executorState); err != nil {
return err
}

Expand Down Expand Up @@ -158,10 +159,18 @@ func (srv *ExecutorApi) ReportEvents(ctx context.Context, list *executorapi.Even
}

// createExecutorState extracts a schedulerobjects.Executor from the requesrt
func (srv *ExecutorApi) createExecutorState(req *executorapi.LeaseRequest) *schedulerobjects.Executor {
nodes := make([]*schedulerobjects.Node, len(req.Nodes))
for i, nodeInfo := range req.Nodes {
nodes[i] = api.NewNodeFromNodeInfo(nodeInfo, req.ExecutorId, srv.allowedPriorities, srv.clock.Now().UTC())
func (srv *ExecutorApi) createExecutorState(ctx context.Context, req *executorapi.LeaseRequest) *schedulerobjects.Executor {
log := ctxlogrus.Extract(ctx)
nodes := make([]*schedulerobjects.Node, 0, len(req.Nodes))
for _, nodeInfo := range req.Nodes {
node, err := api.NewNodeFromNodeInfo(nodeInfo, req.ExecutorId, srv.allowedPriorities, srv.clock.Now().UTC())
if err != nil {
logging.WithStacktrace(log, err).Warnf(
"skipping node %s from executor %s", nodeInfo.GetName(), req.GetExecutorId(),
)
} else {
nodes = append(nodes, node)
}
}
return &schedulerobjects.Executor{
Id: req.ExecutorId,
Expand Down
9 changes: 6 additions & 3 deletions internal/scheduler/api_test.go
Expand Up @@ -140,7 +140,8 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
capturedEvents = append(capturedEvents, msg)
}).AnyTimes()

server := NewExecutorApi(mockPulsarProducer,
server := NewExecutorApi(
mockPulsarProducer,
mockJobRepository,
mockExecutorRepository,
[]int32{},
Expand Down Expand Up @@ -208,11 +209,13 @@ func TestExecutorApi_Publish(t *testing.T) {
callback(pulsarutils.NewMessageId(1), msg, nil)
}).AnyTimes()

server := NewExecutorApi(mockPulsarProducer,
server := NewExecutorApi(
mockPulsarProducer,
mockJobRepository,
mockExecutorRepository,
[]int32{},
100)
100,
)

empty, err := server.ReportEvents(ctx, &executorapi.EventList{Events: tc.sequences})
require.NoError(t, err)
Expand Down
8 changes: 8 additions & 0 deletions internal/scheduler/config.go
Expand Up @@ -9,6 +9,14 @@ import (
grpcconfig "github.com/armadaproject/armada/internal/common/grpc/configuration"
)

const (
// If set on a pod, the value of this annotation is interpreted as the id of a node
// and only the node with that id will be considered for scheduling the pod.
TargetNodeIdAnnotation = "armadaproject.io/targetNodeId"
// If set on a pod, indicates which job this pod is part of.
JobIdAnnotation = "armadaproject.io/jobId"
)

type Configuration struct {
// Database configuration
Postgres configuration.PostgresConfig
Expand Down
43 changes: 20 additions & 23 deletions internal/scheduler/legacyscheduler.go
Expand Up @@ -36,7 +36,6 @@ type LegacySchedulerJob interface {
// SchedulingConstraints collects scheduling constraints,
// e.g., per-queue resource limits.
type SchedulingConstraints struct {
Priorities []int32
PriorityClasses map[string]configuration.PriorityClass
// Executor for which we're currently scheduling jobs.
ExecutorId string
Expand Down Expand Up @@ -72,26 +71,19 @@ func SchedulingConstraintsFromSchedulingConfig(
config configuration.SchedulingConfig,
totalResources schedulerobjects.ResourceList,
) *SchedulingConstraints {
priorities := make([]int32, 0)
maximalCumulativeResourceFractionPerQueueAndPriority := make(map[int32]map[string]float64, 0)
for _, priority := range config.Preemption.PriorityClasses {
// priorities = append(priorities, priority.Priority)
maximalCumulativeResourceFractionPerQueueAndPriority[priority.Priority] = priority.MaximalResourceFractionPerQueue
}
if len(priorities) == 0 {
priorities = []int32{0}
}
return &SchedulingConstraints{
Priorities: priorities,
PriorityClasses: config.Preemption.PriorityClasses,
ExecutorId: executorId,
Pool: pool,
ResourceScarcity: config.GetResourceScarcity(pool),

MaximumJobsToSchedule: config.MaximumJobsToSchedule,
MaxConsecutiveUnschedulableJobs: config.QueueLeaseBatchSize,
MinimumJobSize: minimumJobSize,
MaximalResourceFractionPerQueue: config.MaximalResourceFractionPerQueue,
PriorityClasses: config.Preemption.PriorityClasses,
ExecutorId: executorId,
Pool: pool,
ResourceScarcity: config.GetResourceScarcity(pool),
MaximumJobsToSchedule: config.MaximumJobsToSchedule,
MaxConsecutiveUnschedulableJobs: config.QueueLeaseBatchSize,
MinimumJobSize: minimumJobSize,
MaximalResourceFractionPerQueue: config.MaximalResourceFractionPerQueue,
MaximalCumulativeResourceFractionPerQueueAndPriority: maximalCumulativeResourceFractionPerQueueAndPriority,
MaximalResourceFractionToSchedulePerQueue: config.MaximalResourceFractionToSchedulePerQueue,
MaximalResourceFractionToSchedule: config.MaximalClusterFractionToSchedule,
Expand Down Expand Up @@ -671,10 +663,6 @@ type LegacyScheduler struct {
// Contains all nodes to be considered for scheduling.
// Used for matching pods with nodes.
NodeDb *NodeDb
// Jobs are grouped into gangs by this annotation.
GangIdAnnotation string
// Jobs in a gang specify the number of jobs in the gang via this annotation.
GangCardinalityAnnotation string
}

func (sched *LegacyScheduler) String() string {
Expand Down Expand Up @@ -780,7 +768,6 @@ func NewLegacyScheduler(
if err != nil {
return nil, err
}

return &LegacyScheduler{
ctx: ctx,
SchedulingConstraints: constraints,
Expand Down Expand Up @@ -816,7 +803,6 @@ func (sched *LegacyScheduler) Schedule() ([]LegacySchedulerJob, error) {
for i, r := range reports {
jobs[i] = r.Job
}

reqs := PodRequirementsFromLegacySchedulerJobs(jobs, sched.PriorityClasses)
podSchedulingReports, ok, err := sched.NodeDb.ScheduleMany(reqs)
if err != nil {
Expand Down Expand Up @@ -961,7 +947,18 @@ func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E LegacySchedulerJob](jobs S
rv := make([]*schedulerobjects.PodRequirements, 0, len(jobs))
for _, job := range jobs {
info := job.GetRequirements(priorityClasses)
rv = append(rv, PodRequirementFromJobSchedulingInfo(info))
req := PodRequirementFromJobSchedulingInfo(info)
if _, ok := req.Annotations[JobIdAnnotation]; !ok {
// Auto-populate JobIdAnnotation if not set.
if req.Annotations == nil {
req.Annotations = map[string]string{
JobIdAnnotation: job.GetId(),
}
} else {
req.Annotations[JobIdAnnotation] = job.GetId()
}
}
rv = append(rv, req)
}
return rv
}
Expand Down

0 comments on commit 5e30e3f

Please sign in to comment.