Skip to content

Commit

Permalink
Add in-order scheduling policy (#10902)
Browse files Browse the repository at this point in the history
  • Loading branch information
Micha Reiser committed Jan 11, 2021
1 parent 931d466 commit f0dc993
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -19,6 +19,7 @@
- `[jest-runtime, jest-transform]` share `cacheFS` between runtime and transformer ([#10901](https://github.com/facebook/jest/pull/10901))
- `[jest-transform]` Pass config options defined in Jest's config to transformer's `process` and `getCacheKey` functions ([#10926](https://github.com/facebook/jest/pull/10926))
- `[jest-worker]` Add support for custom task queues and adds a `PriorityQueue` implementation. ([#10921](https://github.com/facebook/jest/pull/10921))
- `[jest-worker]` Add in-order scheduling policy to jest worker ([10902](https://github.com/facebook/jest/pull/10902))

### Fixes

Expand Down
9 changes: 9 additions & 0 deletions packages/jest-worker/README.md
Expand Up @@ -91,6 +91,15 @@ Provide a custom worker pool to be used for spawning child processes. By default

`jest-worker` will automatically detect if `worker_threads` are available, but will not use them unless passed `enableWorkerThreads: true`.

### `workerSchedulingPolicy: 'round-robin' | 'in-order'` (optional)

Specifies the policy how tasks are assigned to workers if multiple workers are _idle_:

- `round-robin` (default): The task will be sequentially distributed onto the workers. The first task is assigned to the worker 1, the second to the worker 2, to ensure that the work is distributed across workers.
- `in-order`: The task will be assigned to the first free worker starting with worker 1 and only assign the work to worker 2 if the worker 1 is busy.

Tasks are always assigned to the first free worker as soon as tasks start to queue up. The scheduling policy does not define the task scheduling which is always first-in, first-out.

### `taskQueue`: TaskQueue` (optional)

The task queue defines in which order tasks (method calls) are processed by the workers. `jest-worker` ships with a `FifoQueue` and `PriorityQueue`:
Expand Down
46 changes: 29 additions & 17 deletions packages/jest-worker/src/Farm.ts
Expand Up @@ -22,28 +22,29 @@ import {
} from './types';

export default class Farm {
private _computeWorkerKey: FarmOptions['computeWorkerKey'];
private _cacheKeys: Record<string, WorkerInterface>;
private _callback: Function;
private _locks: Array<boolean>;
private _numOfWorkers: number;
private _offset: number;
private _taskQueue: TaskQueue;
private readonly _computeWorkerKey: FarmOptions['computeWorkerKey'];
private readonly _workerSchedulingPolicy: NonNullable<
FarmOptions['workerSchedulingPolicy']
>;
private readonly _cacheKeys: Record<string, WorkerInterface> = Object.create(
null,
);
private readonly _locks: Array<boolean> = [];
private _offset = 0;
private readonly _taskQueue: TaskQueue;

constructor(
numOfWorkers: number,
callback: Function,
private _numOfWorkers: number,
private _callback: Function,
options: {
computeWorkerKey?: FarmOptions['computeWorkerKey'];
workerSchedulingPolicy?: FarmOptions['workerSchedulingPolicy'];
taskQueue?: TaskQueue;
} = {},
) {
this._cacheKeys = Object.create(null);
this._callback = callback;
this._locks = [];
this._numOfWorkers = numOfWorkers;
this._offset = 0;
this._computeWorkerKey = options.computeWorkerKey;
this._workerSchedulingPolicy =
options.workerSchedulingPolicy ?? 'round-robin';
this._taskQueue = options.taskQueue ?? new FifoQueue();
}

Expand Down Expand Up @@ -147,19 +148,30 @@ export default class Farm {
private _push(task: QueueChildMessage): Farm {
this._taskQueue.enqueue(task);

const offset = this._getNextWorkerOffset();
for (let i = 0; i < this._numOfWorkers; i++) {
this._process((this._offset + i) % this._numOfWorkers);
this._process((offset + i) % this._numOfWorkers);

if (task.request[1]) {
break;
}
}

this._offset++;

return this;
}

// Typescript ensures that the switch statement is exhaustive.
// Adding an explicit return at the end would disable the exhaustive check void.
// eslint-disable-next-line consistent-return
private _getNextWorkerOffset(): number {
switch (this._workerSchedulingPolicy) {
case 'in-order':
return 0;
case 'round-robin':
return this._offset++;
}
}

private _lock(workerId: number): void {
this._locks[workerId] = true;
}
Expand Down
30 changes: 29 additions & 1 deletion packages/jest-worker/src/__tests__/process-integration.test.js
Expand Up @@ -83,17 +83,45 @@ describe('Jest Worker Integration', () => {

// The first call will go to the first child process.
const promise0 = farm.foo('param-0');

assertCallsToChild(0, ['foo', 'param-0']);
replySuccess(0, 'worker-0');
expect(await promise0).toBe('worker-0');

// The second call will go to the second child process.
const promise1 = farm.foo(1);
assertCallsToChild(1, ['foo', 1]);
replySuccess(1, 'worker-1');
expect(await promise1).toBe('worker-1');
});

it('schedules the task on the first available child processes if the scheduling policy is in-order', async () => {
const farm = new Farm('/tmp/baz.js', {
exposedMethods: ['foo', 'bar'],
numWorkers: 4,
workerSchedulingPolicy: 'in-order',
});

// The first call will go to the first child process.
const promise0 = farm.foo('param-0');
assertCallsToChild(0, ['foo', 'param-0']);

// The second call will go to the second child process.
const promise1 = farm.foo(1);

// The first task on worker 0 completes
replySuccess(0, 'worker-0');
expect(await promise0).toBe('worker-0');

// The second task on worker 1 completes
assertCallsToChild(1, ['foo', 1]);
replySuccess(1, 'worker-1');
expect(await promise1).toBe('worker-1');

// The third call will go to the first child process
const promise2 = farm.foo('param-2');
assertCallsToChild(0, ['foo', 'param-0'], ['foo', 'param-2']);
replySuccess(0, 'worker-0');
expect(await promise2).toBe('worker-0');
});

it('distributes concurrent calls across child processes', async () => {
Expand Down
51 changes: 50 additions & 1 deletion packages/jest-worker/src/__tests__/thread-integration.test.js
Expand Up @@ -86,14 +86,63 @@ describe('Jest Worker Process Integration', () => {

// The first call will go to the first child process.
const promise0 = farm.foo('param-0');

assertCallsToChild(0, ['foo', 'param-0']);
replySuccess(0, 'worker-0');
expect(await promise0).toBe('worker-0');

// The second call will go to the second child process.
const promise1 = farm.foo(1);
assertCallsToChild(1, ['foo', 1]);
replySuccess(1, 'worker-1');
expect(await promise1).toBe('worker-1');
});

it('schedules the task on the first available child processes if the scheduling policy is in-order', async () => {
const farm = new Farm('/tmp/baz.js', {
enableWorkerThreads: true,
exposedMethods: ['foo', 'bar'],
numWorkers: 4,
workerSchedulingPolicy: 'in-order',
});

// The first call will go to the first child process.
const promise0 = farm.foo('param-0');
assertCallsToChild(0, ['foo', 'param-0']);

// The second call will go to the second child process.
const promise1 = farm.foo(1);

// The first task on worker 0 completes
replySuccess(0, 'worker-0');
expect(await promise0).toBe('worker-0');

// The second task on worker 1 completes
assertCallsToChild(1, ['foo', 1]);
replySuccess(1, 'worker-1');
expect(await promise1).toBe('worker-1');

// The third call will go to the first child process
const promise2 = farm.foo('param-2');
assertCallsToChild(0, ['foo', 'param-0'], ['foo', 'param-2']);
replySuccess(0, 'worker-0');
expect(await promise2).toBe('worker-0');
});

it('schedules the task on the first available child processes', async () => {
const farm = new Farm('/tmp/baz.js', {
enableWorkerThreads: true,
exposedMethods: ['foo', 'bar'],
numWorkers: 4,
});

// The first call will go to the first child process.
const promise0 = farm.foo('param-0');
assertCallsToChild(0, ['foo', 'param-0']);
replySuccess(0, 'worker-0');
expect(await promise0).toBe('worker-0');

// The second call will go to the second child process.
const promise1 = farm.foo(1);
assertCallsToChild(1, ['foo', 1]);
replySuccess(1, 'worker-1');
expect(await promise1).toBe('worker-1');
Expand Down
1 change: 1 addition & 0 deletions packages/jest-worker/src/index.ts
Expand Up @@ -105,6 +105,7 @@ export class Worker {
{
computeWorkerKey: this._options.computeWorkerKey,
taskQueue: this._options.taskQueue,
workerSchedulingPolicy: this._options.workerSchedulingPolicy,
},
);

Expand Down
1 change: 1 addition & 0 deletions packages/jest-worker/src/types.ts
Expand Up @@ -96,6 +96,7 @@ export type FarmOptions = {
computeWorkerKey?: (method: string, ...args: Array<unknown>) => string | null;
exposedMethods?: ReadonlyArray<string>;
forkOptions?: ForkOptions;
workerSchedulingPolicy?: 'round-robin' | 'in-order';
resourceLimits?: ResourceLimits;
setupArgs?: Array<unknown>;
maxRetries?: number;
Expand Down

0 comments on commit f0dc993

Please sign in to comment.