Skip to content

Commit

Permalink
Firs Round Of Fixes From E2E Scheduling Testing (#2072)
Browse files Browse the repository at this point in the history
* initial commit

* wip

* linting

* fix tests

* linting

---------

Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 and Chris Martin committed Jan 31, 2023
1 parent 9815a53 commit 3b9ac8a
Show file tree
Hide file tree
Showing 18 changed files with 153 additions and 79 deletions.
7 changes: 5 additions & 2 deletions cmd/scheduler/cmd/root.go
Expand Up @@ -21,10 +21,13 @@ func RootCmd() *cobra.Command {
}

cmd.PersistentFlags().StringSlice(
"armadaUrl",
CustomConfigLocation,
[]string{},
"Fully qualified path to application configuration file (for multiple config files repeat this arg or separate paths with commas)")

err := viper.BindPFlag(CustomConfigLocation, cmd.PersistentFlags().Lookup(CustomConfigLocation))
if err != nil {
panic(err)
}
cmd.AddCommand(
runCmd(),
migrateDbCmd(),
Expand Down
6 changes: 6 additions & 0 deletions config/scheduler/config.yaml
Expand Up @@ -9,6 +9,12 @@ pulsar:
maxConnectionsPerBroker: 1
compressionType: zlib
compressionLevel: faster
redis:
addrs:
- redis:6379
password: ""
db: 0
poolSize: 1000
postgres:
connection:
host: postgres
Expand Down
7 changes: 7 additions & 0 deletions config/scheduleringester/config.yaml
Expand Up @@ -22,3 +22,10 @@ pulsar:
subscriptionName: "scheduler-ingester"
batchSize: 10000
batchDuration: 500ms
priorityClasses:
armada-default:
priority: 1000
armada-preemptible:
priority: 900


5 changes: 4 additions & 1 deletion internal/armada/server.go
Expand Up @@ -76,7 +76,10 @@ func Serve(ctx context.Context, config *configuration.ArmadaConfig, healthChecks
// We support multiple simultaneous authentication services (e.g., username/password OpenId).
// For each gRPC request, we try them all until one succeeds, at which point the process is
// short-circuited.
authServices := auth.ConfigureAuth(config.Auth)
authServices, err := auth.ConfigureAuth(config.Auth)
if err != nil {
return err
}
grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, authServices)

// Shut down grpcServer if the context is cancelled.
Expand Down
8 changes: 7 additions & 1 deletion internal/binoculars/server.go
Expand Up @@ -30,7 +30,13 @@ func StartUp(config *configuration.BinocularsConfig) (func(), *sync.WaitGroup) {
os.Exit(-1)
}

grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, auth.ConfigureAuth(config.Auth))
authServices, err := auth.ConfigureAuth(config.Auth)
if err != nil {
log.Errorf("Failed to create auth services %s", err)
os.Exit(-1)
}

grpcServer := grpcCommon.CreateGrpcServer(config.Grpc.KeepaliveParams, config.Grpc.KeepaliveEnforcementPolicy, authServices)

logService := logs.NewKubernetesLogService(kubernetesClientProvider)
binocularsServer := server.NewBinocularsServer(logService)
Expand Down
15 changes: 8 additions & 7 deletions internal/common/auth/setup.go
Expand Up @@ -2,15 +2,16 @@ package auth

import (
"context"
"errors"

"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common/auth/authorization"
"github.com/armadaproject/armada/internal/common/auth/authorization/groups"
"github.com/armadaproject/armada/internal/common/auth/configuration"
)

func ConfigureAuth(config configuration.AuthConfig) []authorization.AuthService {
authServices := []authorization.AuthService{}
func ConfigureAuth(config configuration.AuthConfig) ([]authorization.AuthService, error) {
var authServices []authorization.AuthService

if len(config.BasicAuth.Users) > 0 {
authServices = append(authServices,
Expand All @@ -25,7 +26,7 @@ func ConfigureAuth(config configuration.AuthConfig) []authorization.AuthService
if config.OpenIdAuth.ProviderUrl != "" {
openIdAuthService, err := authorization.NewOpenIdAuthServiceForProvider(context.Background(), &config.OpenIdAuth)
if err != nil {
panic(err)
return nil, errors.WithMessage(err, "error initialising openId auth")
}
authServices = append(authServices, openIdAuthService)
}
Expand All @@ -43,14 +44,14 @@ func ConfigureAuth(config configuration.AuthConfig) []authorization.AuthService

kerberosAuthService, err := authorization.NewKerberosAuthService(&config.Kerberos, groupLookup)
if err != nil {
panic(err)
return nil, errors.WithMessage(err, "error initialising kerberos auth")
}
authServices = append(authServices, kerberosAuthService)
}

if len(authServices) == 0 {
panic(errors.New("At least one auth method must be specified in config"))
return nil, errors.New("at least one auth method must be specified in config")
}

return authServices
return authServices, nil
}
16 changes: 16 additions & 0 deletions internal/common/grpc/grpc.go
@@ -1,10 +1,12 @@
package grpc

import (
"context"
"fmt"
"net"
"runtime/debug"
"sync"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
Expand Down Expand Up @@ -105,6 +107,20 @@ func Listen(port uint16, grpcServer *grpc.Server, wg *sync.WaitGroup) {
}()
}

// CreateShutdownHandler returns a function that shuts down the grpcServer when the context is closed.
// The server is given gracePeriod to perform a graceful showdown and is then forcably stopped if necessary
func CreateShutdownHandler(ctx context.Context, gracePeriod time.Duration, grpcServer *grpc.Server) func() error {
return func() error {
<-ctx.Done()
go func() {
time.Sleep(gracePeriod)
grpcServer.Stop()
}()
grpcServer.GracefulStop()
return nil
}
}

// This function is called whenever a gRPC handler panics.
func panicRecoveryHandler(p interface{}) (err error) {
log.Errorf("Request triggered panic with cause %v \n%s", p, string(debug.Stack()))
Expand Down
11 changes: 9 additions & 2 deletions internal/scheduler/api.go
Expand Up @@ -39,7 +39,14 @@ func NewExecutorApi(producer pulsar.Producer,
executorRepository database.ExecutorRepository,
allowedPriorities []int32,
maxJobsPerCall uint,
) *ExecutorApi {
) (*ExecutorApi, error) {
if len(allowedPriorities) == 0 {
return nil, errors.New("allowedPriorities cannot be empty")
}
if maxJobsPerCall == 0 {
return nil, errors.New("maxJobsPerCall cannot be 0")
}

return &ExecutorApi{
producer: producer,
jobRepository: jobRepository,
Expand All @@ -48,7 +55,7 @@ func NewExecutorApi(producer pulsar.Producer,
maxJobsPerCall: maxJobsPerCall,
maxPulsarMessageSize: 1024 * 1024 * 2,
clock: clock.RealClock{},
}
}, nil
}

// LeaseJobRuns performs the following actions:
Expand Down
29 changes: 20 additions & 9 deletions internal/scheduler/api_test.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/armadaproject/armada/internal/common/compress"
Expand Down Expand Up @@ -47,11 +48,18 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
Pool: "test-pool",
Nodes: []*schedulerobjects.Node{
{
Id: "test-executor-test-node",
TotalResources: schedulerobjects.ResourceList{},
JobRuns: []string{runId1.String(), runId2.String()},
AllocatableByPriorityAndResource: map[int32]schedulerobjects.ResourceList{},
LastSeen: testClock.Now().UTC(),
Id: "test-executor-test-node",
TotalResources: schedulerobjects.ResourceList{},
JobRuns: []string{runId1.String(), runId2.String()},
AllocatableByPriorityAndResource: map[int32]schedulerobjects.ResourceList{
1000: {
Resources: map[string]resource.Quantity{},
},
2000: {
Resources: map[string]resource.Quantity{},
},
},
LastSeen: testClock.Now().UTC(),
},
},
MinimumJobSize: schedulerobjects.ResourceList{},
Expand Down Expand Up @@ -140,13 +148,14 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
capturedEvents = append(capturedEvents, msg)
}).AnyTimes()

server := NewExecutorApi(
server, err := NewExecutorApi(
mockPulsarProducer,
mockJobRepository,
mockExecutorRepository,
[]int32{},
[]int32{1000, 2000},
maxJobsPerCall,
)
require.NoError(t, err)
server.clock = testClock

err = server.LeaseJobRuns(mockStream)
Expand Down Expand Up @@ -209,14 +218,16 @@ func TestExecutorApi_Publish(t *testing.T) {
callback(pulsarutils.NewMessageId(1), msg, nil)
}).AnyTimes()

server := NewExecutorApi(
server, err := NewExecutorApi(
mockPulsarProducer,
mockJobRepository,
mockExecutorRepository,
[]int32{},
[]int32{1000, 2000},
100,
)

require.NoError(t, err)

empty, err := server.ReportEvents(ctx, &executorapi.EventList{Events: tc.sequences})
require.NoError(t, err)
assert.NotNil(t, empty)
Expand Down
10 changes: 0 additions & 10 deletions internal/scheduler/jobdb.go
Expand Up @@ -53,12 +53,6 @@ type SchedulerJob struct {
// Jobs with identical Queue and Priority
// are sorted by timestamp.
Timestamp int64
// Name of the executor to which this job has been assigned.
// Empty if this job has not yet been assigned.
Executor string
// Name of the node to which this job has been assigned.
// Empty if this job has not yet been assigned.
Node string
// True if the job is currently queued.
// If this is set then the job will not be considered for scheduling
Queued bool
Expand Down Expand Up @@ -155,7 +149,6 @@ func (job *SchedulerJob) DeepCopy() *SchedulerJob {
Jobset: job.Jobset,
Priority: job.Priority,
Timestamp: job.Timestamp,
Node: job.Node,
Queued: job.Queued,
jobSchedulingInfo: proto.Clone(job.jobSchedulingInfo).(*schedulerobjects.JobSchedulingInfo),
CancelRequested: job.CancelRequested,
Expand All @@ -172,8 +165,6 @@ type JobRun struct {
RunID uuid.UUID
// The name of the executor this run has been leased to
Executor string
// True if the job has been reported as pending by the executor
Pending bool
// True if the job has been reported as running by the executor
Running bool
// True if the job has been reported as succeeded by the executor
Expand All @@ -199,7 +190,6 @@ func (run *JobRun) DeepCopy() *JobRun {
return &JobRun{
RunID: run.RunID,
Executor: run.Executor,
Pending: run.Pending,
Running: run.Running,
Succeeded: run.Succeeded,
Failed: run.Failed,
Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/jobdb_test.go
Expand Up @@ -22,7 +22,6 @@ var job1 = &SchedulerJob{
Queue: "A",
Priority: 0,
Timestamp: 10,
Node: "",
jobSchedulingInfo: nil,
}

Expand All @@ -31,7 +30,6 @@ var job2 = &SchedulerJob{
Queue: "A",
Priority: 0,
Timestamp: 10,
Node: "",
jobSchedulingInfo: nil,
}

Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/publisher_test.go
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -185,7 +184,6 @@ func TestPulsarPublisher_TestPublishMarkers(t *testing.T) {
capturedPartitions[key] = true
}
if numPublished > tc.numSuccessfulPublishes {
log.Info("returning error")
return pulsarutils.NewMessageId(numPublished), errors.New("error from mock pulsar producer")
}
return pulsarutils.NewMessageId(numPublished), nil
Expand Down
29 changes: 13 additions & 16 deletions internal/scheduler/scheduler.go
Expand Up @@ -263,21 +263,24 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*SchedulerJob, error) {
if err != nil {
return nil, errors.Wrapf(err, "error retrieving job %s from jobDb ", jobId)
}

// If the job is nil at this point then it cannot be active.
// In this case we can ignore the run
if job == nil {
log.Debugf("Job %s is not an active job. Ignoring update for run %s", jobId, dbRun.RunID)
continue
}

job = job.DeepCopy()
jobsToUpdateById[jobId] = job
}

// If the job is nil at this point then it cannot be active.
// In this case we can ignore the run
if job == nil {
log.Debugf("Job %s is not an active job. Ignoring update for run %s", jobId, dbRun.RunID)
continue
}

returnProcessed := false
run := job.RunById(dbRun.RunID)
if run == nil {
run = s.createSchedulerRun(&dbRun)
// TODO: we need to ensure that runs end up in the correct order here
// This will need us to store an order id in the db
job.Runs = append(job.Runs, run)
} else {
returnProcessed = run.Returned
Expand All @@ -290,20 +293,14 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*SchedulerJob, error) {
// do the same, but eventually we should send an actual queued message and this bit of code can disappear
if !returnProcessed && run.Returned && job.NumReturned() <= s.maxLeaseReturns {
job.Queued = true
job.Node = ""
job.Executor = ""
run.Failed = false // unset failed here so that we don't generate a job failed message later
}
}

// any jobs that have don't have active run need to be marked as queued
for _, job := range jobsToUpdateById {
run := job.CurrentRun()
if run == nil || run.InTerminalState() {
job.Queued = true
job.Node = ""
job.Executor = ""
}
job.Queued = run == nil || run.InTerminalState()
}

jobsToUpdate := maps.Values(jobsToUpdateById)
Expand Down Expand Up @@ -343,7 +340,7 @@ func (s *Scheduler) generateLeaseMessages(scheduledJobs []*SchedulerJob) ([]*arm
JobRunLeased: &armadaevents.JobRunLeased{
RunId: armadaevents.ProtoUuidFromUuid(job.CurrentRun().RunID),
JobId: jobId,
ExecutorId: job.Executor,
ExecutorId: job.CurrentRun().Executor,
},
},
},
Expand Down Expand Up @@ -608,7 +605,7 @@ func (s *Scheduler) ensureDbUpToDate(ctx context.Context, pollInterval time.Dura
log.Infof("Successfully ensured that database state is up to date")
return nil
}
log.Infof("Recevied %d partitions, still waiting on %d", numSent, numSent-numReceived)
log.Infof("Recevied %d partitions, still waiting on %d", numReceived, numSent-numReceived)
s.clock.Sleep(pollInterval)
}
}
Expand Down

0 comments on commit 3b9ac8a

Please sign in to comment.