Skip to content

Commit

Permalink
feat(worker): add start method to worker farms (#13937)
Browse files Browse the repository at this point in the history
  • Loading branch information
SimenB committed Feb 21, 2023
1 parent 1eb3bb5 commit 5858508
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,8 @@

### Features

- `[jest-worker]` Add `start` method to worker farms ([#13937](https://github.com/facebook/jest/pull/13937))

### Fixes

### Chore & Maintenance
Expand Down
8 changes: 7 additions & 1 deletion packages/jest-worker/README.md
Expand Up @@ -140,11 +140,17 @@ Returns a `ReadableStream` where the standard output of all workers is piped. No

Returns a `ReadableStream` where the standard error of all workers is piped. Note that the `silent` option of the child workers must be set to `true` to make it work. This is the default set by `jest-worker`, but keep it in mind when overriding options through `forkOptions`.

#### `start()`

Starts up every worker and calls their `setup` function, if it exists. Returns a `Promise` which resolves when all workers are running and have completed their `setup`.

This is useful if you want to start up all your workers eagerly before they are used to call any other functions.

#### `end()`

Finishes the workers by killing all workers. No further calls can be done to the `Worker` instance.

Returns a Promise that resolves with `{ forceExited: boolean }` once all workers are dead. If `forceExited` is `true`, at least one of the workers did not exit gracefully, which likely happened because it executed a leaky task that left handles open. This should be avoided, force exiting workers is a last resort to prevent creating lots of orphans.
Returns a `Promise` that resolves with `{ forceExited: boolean }` once all workers are dead. If `forceExited` is `true`, at least one of the workers did not exit gracefully, which likely happened because it executed a leaky task that left handles open. This should be avoided, force exiting workers is a last resort to prevent creating lots of orphans.

**Note:**

Expand Down
5 changes: 2 additions & 3 deletions packages/jest-worker/__benchmarks__/test.js
Expand Up @@ -87,11 +87,10 @@ function testJestWorker() {

async function countToFinish() {
if (++count === calls) {
farm.end();
const endTime = performance.now();

// Let all workers go down.
await sleep(2000);
await farm.end();

resolve({
globalTime: endTime - startTime - 2000,
Expand All @@ -110,7 +109,7 @@ function testJestWorker() {
farm.getStderr().pipe(process.stderr);

// Let all workers come up.
await sleep(2000);
await farm.start();

const startProcess = performance.now();

Expand Down
2 changes: 2 additions & 0 deletions packages/jest-worker/__typetests__/jest-worker.test.ts
Expand Up @@ -88,6 +88,8 @@ expectError(typedWorkerFarm.runTestAsync());
expectError(typedWorkerFarm.setup());
expectError(typedWorkerFarm.teardown());

expectType<Promise<void>>(typedWorkerFarm.start());

expectError<Promise<void>>(typedWorkerFarm.end());
expectType<Promise<{forceExited: boolean}>>(typedWorkerFarm.end());

Expand Down
1 change: 1 addition & 0 deletions packages/jest-worker/src/__tests__/index.test.ts
Expand Up @@ -89,6 +89,7 @@ it('exposes the right API using passed worker', () => {
getStdout: jest.fn(),
getWorkers: jest.fn(),
send: jest.fn(),
start: jest.fn(),
}));

const farm = new WorkerFarm('/tmp/baz.js', {
Expand Down
24 changes: 24 additions & 0 deletions packages/jest-worker/src/base/BaseWorkerPool.ts
Expand Up @@ -7,6 +7,7 @@

import mergeStream = require('merge-stream');
import {
CHILD_MESSAGE_CALL_SETUP,
CHILD_MESSAGE_END,
PoolExitResult,
WorkerInterface,
Expand Down Expand Up @@ -87,6 +88,29 @@ export default class BaseWorkerPool {
throw Error('Missing method createWorker in WorkerPool');
}

async start(): Promise<void> {
await Promise.all(
this._workers.map(async worker => {
await worker.waitForWorkerReady();

await new Promise<void>((resolve, reject) => {
worker.send(
[CHILD_MESSAGE_CALL_SETUP],
emptyMethod,
error => {
if (error) {
reject(error);
} else {
resolve();
}
},
emptyMethod,
);
});
}),
);
}

async end(): Promise<PoolExitResult> {
// We do not cache the request object here. If so, it would only be only
// processed by one of the workers, and we want them all to close.
Expand Down
4 changes: 4 additions & 0 deletions packages/jest-worker/src/index.ts
Expand Up @@ -174,6 +174,10 @@ export class Worker {
return this._workerPool.getStdout();
}

async start(): Promise<void> {
await this._workerPool.start();
}

async end(): Promise<PoolExitResult> {
if (this._ending) {
throw new Error('Farm is ended, no more calls can be done to it');
Expand Down
7 changes: 6 additions & 1 deletion packages/jest-worker/src/types.ts
Expand Up @@ -36,6 +36,7 @@ export const CHILD_MESSAGE_INITIALIZE = 0;
export const CHILD_MESSAGE_CALL = 1;
export const CHILD_MESSAGE_END = 2;
export const CHILD_MESSAGE_MEM_USAGE = 3;
export const CHILD_MESSAGE_CALL_SETUP = 4;

export const PARENT_MESSAGE_OK = 0;
export const PARENT_MESSAGE_CLIENT_ERROR = 1;
Expand All @@ -61,6 +62,7 @@ export interface WorkerPoolInterface {
getWorkers(): Array<WorkerInterface>;
createWorker(options: WorkerOptions): WorkerInterface;
send: WorkerCallback;
start(): Promise<void>;
end(): Promise<PoolExitResult>;
}

Expand Down Expand Up @@ -223,11 +225,14 @@ export type ChildMessageEnd = [

export type ChildMessageMemUsage = [type: typeof CHILD_MESSAGE_MEM_USAGE];

export type ChildMessageCallSetup = [type: typeof CHILD_MESSAGE_CALL_SETUP];

export type ChildMessage =
| ChildMessageInitialize
| ChildMessageCall
| ChildMessageEnd
| ChildMessageMemUsage;
| ChildMessageMemUsage
| ChildMessageCallSetup;

// Messages passed from the children to the parent.

Expand Down
23 changes: 23 additions & 0 deletions packages/jest-worker/src/workers/processChild.ts
Expand Up @@ -8,6 +8,7 @@
import {isPromise} from 'jest-util';
import {
CHILD_MESSAGE_CALL,
CHILD_MESSAGE_CALL_SETUP,
CHILD_MESSAGE_END,
CHILD_MESSAGE_INITIALIZE,
CHILD_MESSAGE_MEM_USAGE,
Expand Down Expand Up @@ -61,6 +62,28 @@ const messageListener: NodeJS.MessageListener = (request: any) => {
reportMemoryUsage();
break;

case CHILD_MESSAGE_CALL_SETUP:
if (initialized) {
reportSuccess(void 0);
} else {
const main = require(file!);

initialized = true;

if (main.setup) {
execFunction(
main.setup,
main,
setupArgs,
reportSuccess,
reportInitializeError,
);
} else {
reportSuccess(void 0);
}
}
break;

default:
throw new TypeError(
`Unexpected request from parent process: ${request[0]}`,
Expand Down
23 changes: 23 additions & 0 deletions packages/jest-worker/src/workers/threadChild.ts
Expand Up @@ -9,6 +9,7 @@ import {isMainThread, parentPort} from 'worker_threads';
import {isPromise} from 'jest-util';
import {
CHILD_MESSAGE_CALL,
CHILD_MESSAGE_CALL_SETUP,
CHILD_MESSAGE_END,
CHILD_MESSAGE_INITIALIZE,
CHILD_MESSAGE_MEM_USAGE,
Expand Down Expand Up @@ -63,6 +64,28 @@ const messageListener = (request: any) => {
reportMemoryUsage();
break;

case CHILD_MESSAGE_CALL_SETUP:
if (initialized) {
reportSuccess(void 0);
} else {
const main = require(file!);

initialized = true;

if (main.setup) {
execFunction(
main.setup,
main,
setupArgs,
reportSuccess,
reportInitializeError,
);
} else {
reportSuccess(void 0);
}
}
break;

default:
throw new TypeError(
`Unexpected request from parent process: ${request[0]}`,
Expand Down

0 comments on commit 5858508

Please sign in to comment.