Skip to content

Commit

Permalink
exit workers gracefully: basic child process impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jeysal committed Mar 25, 2019
1 parent 7e4e170 commit 47c8cea
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 59 deletions.
2 changes: 2 additions & 0 deletions packages/jest-worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ Returns a `ReadableStream` where the standard error of all workers is piped. Not

### `end()`

TODO explain returned Promise, adapt examples

Finishes the workers by killing all workers. No further calls can be done to the `Worker` instance.

**Note:** Each worker has a unique id (index that starts with `1`) which is available on `process.env.JEST_WORKER_ID`
Expand Down
1 change: 0 additions & 1 deletion packages/jest-worker/src/WorkerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {

const canUseWorkerThreads = () => {
try {
// $FlowFixMe: Flow doesn't know about experimental APIs
require('worker_threads');
return true;
} catch (_) {
Expand Down
38 changes: 30 additions & 8 deletions packages/jest-worker/src/base/BaseWorkerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ import mergeStream from 'merge-stream';

import {
CHILD_MESSAGE_END,
CHILD_MESSAGE_FORCE_EXIT,
PoolExitResult,
WorkerPoolOptions,
WorkerOptions,
WorkerInterface,
} from '../types';

// How long to wait for the child process to terminate
// after CHILD_MESSAGE_END before sending CHILD_MESSAGE_FORCE_EXIT
const FORCE_EXIT_DELAY = 100;
/* istanbul ignore next */
const emptyMethod = () => {};

Expand Down Expand Up @@ -85,15 +90,32 @@ export default class BaseWorkerPool {
throw Error('Missing method createWorker in WorkerPool');
}

end(): void {
async end(): Promise<PoolExitResult> {
// We do not cache the request object here. If so, it would only be only
// processed by one of the workers, and we want them all to close.
for (let i = 0; i < this._workers.length; i++) {
this._workers[i].send(
[CHILD_MESSAGE_END, false],
emptyMethod,
emptyMethod,
);
}
const workerExitPromises = this._workers.map(async worker => {
worker.send([CHILD_MESSAGE_END, false], emptyMethod, emptyMethod);

// Schedule a force exit in case worker fails to exit gracefully so
// await worker.waitForExit() never takes longer than FORCE_EXIT_DELAY
let forceExited = false;
const forceExitTimeout = setTimeout(() => {
worker.send([CHILD_MESSAGE_FORCE_EXIT], emptyMethod, emptyMethod);
forceExited = true;
}, FORCE_EXIT_DELAY);

await worker.waitForExit();
// Worker ideally exited gracefully, don't send force exit then
clearTimeout(forceExitTimeout);

return forceExited;
});

return (await Promise.all(workerExitPromises)).reduce<PoolExitResult>(
(result, forceExited) => ({
forceExited: result.forceExited || forceExited,
}),
{forceExited: false},
);
}
}
105 changes: 81 additions & 24 deletions packages/jest-worker/src/base/__tests__/BaseWorkerPool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@

'use strict';

import {CHILD_MESSAGE_END} from '../../types';
import {CHILD_MESSAGE_END, CHILD_MESSAGE_FORCE_EXIT} from '../../types';

import BaseWorkerPool from '../BaseWorkerPool';

const Worker = jest.fn(() => ({
getStderr: () => ({once() {}, pipe() {}}),
getStdout: () => ({once() {}, pipe() {}}),
send: jest.fn(),
}));
const Worker = jest.fn();

const mockSend = jest.fn();

Expand All @@ -31,6 +27,12 @@ class MockWorkerPool extends BaseWorkerPool {
describe('BaseWorkerPool', () => {
beforeEach(() => {
Worker.mockClear();
Worker.mockImplementation(() => ({
getStderr: () => ({once() {}, pipe() {}}),
getStdout: () => ({once() {}, pipe() {}}),
send: jest.fn(),
waitForExit: () => Promise.resolve(),
}));
});

it('throws error when createWorker is not defined', () => {
Expand Down Expand Up @@ -60,24 +62,6 @@ describe('BaseWorkerPool', () => {
expect(pool.getWorkerById(3)).toBeDefined();
});

it('ends all workers', () => {
const pool = new MockWorkerPool('/tmp/baz.js', {
forkOptions: {execArgv: []},
maxRetries: 6,
numWorkers: 4,
setupArgs: [],
});

const workers = pool.getWorkers();
pool.end();

const endMessage = [CHILD_MESSAGE_END, false];
expect(workers[0].send.mock.calls[0][0]).toEqual(endMessage);
expect(workers[1].send.mock.calls[0][0]).toEqual(endMessage);
expect(workers[2].send.mock.calls[0][0]).toEqual(endMessage);
expect(workers[3].send.mock.calls[0][0]).toEqual(endMessage);
});

it('creates and expoeses n workers', () => {
const pool = new MockWorkerPool('/tmp/baz.js', {
forkOptions: {execArgv: []},
Expand Down Expand Up @@ -221,4 +205,77 @@ describe('BaseWorkerPool', () => {
expect(() => farm.send()).not.toThrow();
expect(() => farm.send()).not.toThrow();
});

describe('end', () => {
it('ends all workers', async () => {
const pool = new MockWorkerPool('/tmp/baz.js', {
forkOptions: {execArgv: []},
maxRetries: 6,
numWorkers: 4,
setupArgs: [],
});

const workers = pool.getWorkers();
await pool.end();

const endMessage = [CHILD_MESSAGE_END, false];
expect(workers[0].send.mock.calls[0][0]).toEqual(endMessage);
expect(workers[1].send.mock.calls[0][0]).toEqual(endMessage);
expect(workers[2].send.mock.calls[0][0]).toEqual(endMessage);
expect(workers[3].send.mock.calls[0][0]).toEqual(endMessage);
});

it('resolves with forceExited=false if workers exited gracefully', async () => {
Worker.mockImplementation(() => ({
getStderr: () => null,
getStdout: () => null,
send: jest.fn(),
waitForExit: () => Promise.resolve(),
}));

const pool = new MockWorkerPool('/tmp/baz.js', {
forkOptions: {execArgv: []},
maxRetries: 6,
numWorkers: 4,
setupArgs: [],
});

expect(await pool.end()).toEqual({forceExited: false});
});

it('force exits workers that do not exit gracefully and resolves with forceExited=true', async () => {
// Set it up so that the first worker does not resolve waitForExit immediately,
// but only when it receives CHILD_MESSAGE_FORCE_EXIT
let worker0Exited;
Worker.mockImplementationOnce(() => ({
getStderr: () => null,
getStdout: () => null,
send: request => {
if (request[0] === CHILD_MESSAGE_FORCE_EXIT) {
worker0Exited();
}
},
waitForExit: () => new Promise(resolve => (worker0Exited = resolve)),
})).mockImplementation(() => ({
getStderr: () => null,
getStdout: () => null,
send: jest.fn(),
waitForExit: () => Promise.resolve(),
}));

const pool = new MockWorkerPool('/tmp/baz.js', {
forkOptions: {execArgv: []},
maxRetries: 6,
numWorkers: 2,
setupArgs: [],
});

const workers = pool.getWorkers();
expect(await pool.end()).toEqual({forceExited: true});

expect(workers[1].send).not.toHaveBeenCalledWith([
CHILD_MESSAGE_FORCE_EXIT,
]);
});
});
});
14 changes: 9 additions & 5 deletions packages/jest-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
import os from 'os';
import WorkerPool from './WorkerPool';
import Farm from './Farm';
import {WorkerPoolInterface, WorkerPoolOptions, FarmOptions} from './types';
import {
WorkerPoolInterface,
WorkerPoolOptions,
FarmOptions,
PoolExitResult,
} from './types';

function getExposedMethods(
workerPath: string,
Expand Down Expand Up @@ -132,13 +137,12 @@ export default class JestWorker {
return this._workerPool.getStdout();
}

end(): void {
end(): Promise<PoolExitResult> {
if (this._ending) {
throw new Error('Farm is ended, no more calls can be done to it');
}

this._workerPool.end();

this._ending = true;

return this._workerPool.end();
}
}
24 changes: 16 additions & 8 deletions packages/jest-worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {ForkOptions} from 'child_process';
export const CHILD_MESSAGE_INITIALIZE: 0 = 0;
export const CHILD_MESSAGE_CALL: 1 = 1;
export const CHILD_MESSAGE_END: 2 = 2;
export const CHILD_MESSAGE_FORCE_EXIT: 3 = 3;

export const PARENT_MESSAGE_OK: 0 = 0;
export const PARENT_MESSAGE_CLIENT_ERROR: 1 = 1;
Expand All @@ -24,10 +25,6 @@ export type PARENT_MESSAGE_ERROR =
| typeof PARENT_MESSAGE_CLIENT_ERROR
| typeof PARENT_MESSAGE_SETUP_ERROR;

// Option objects.

export {ForkOptions};

export interface WorkerPoolInterface {
getStderr(): NodeJS.ReadableStream;
getStdout(): NodeJS.ReadableStream;
Expand All @@ -39,7 +36,7 @@ export interface WorkerPoolInterface {
onStart: OnStart,
onEnd: OnEnd,
): void;
end(): void;
end(): Promise<PoolExitResult>;
}

export interface WorkerInterface {
Expand All @@ -48,13 +45,21 @@ export interface WorkerInterface {
onProcessStart: OnStart,
onProcessEnd: OnEnd,
): void;
waitForExit(): Promise<void>;

getWorkerId(): number;
getStderr(): NodeJS.ReadableStream | null;
getStdout(): NodeJS.ReadableStream | null;
onExit(exitCode: number): void;
onMessage(message: ParentMessage): void;
}

export type PoolExitResult = {
forceExited: boolean;
};

// Option objects.

export {ForkOptions};

export type FarmOptions = {
computeWorkerKey?: (method: string, ...args: Array<unknown>) => string | null;
exposedMethods?: ReadonlyArray<string>;
Expand Down Expand Up @@ -116,10 +121,13 @@ export type ChildMessageEnd = [
boolean // processed
];

export type ChildMessageForceExit = [typeof CHILD_MESSAGE_FORCE_EXIT];

export type ChildMessage =
| ChildMessageInitialize
| ChildMessageCall
| ChildMessageEnd;
| ChildMessageEnd
| ChildMessageForceExit;

// Messages passed from the children to the parent.

Expand Down
37 changes: 27 additions & 10 deletions packages/jest-worker/src/workers/ChildProcessWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,30 @@ import {
export default class ChildProcessWorker implements WorkerInterface {
private _child!: ChildProcess;
private _options: WorkerOptions;
private _onProcessEnd!: OnEnd;
private _fakeStream: PassThrough | null;

private _request: ChildMessage | null;
private _retries!: number;
private _stderr: ReturnType<typeof mergeStream> | null;
private _onProcessEnd!: OnEnd;

private _fakeStream: PassThrough | null;
private _stdout: ReturnType<typeof mergeStream> | null;
private _stderr: ReturnType<typeof mergeStream> | null;

private _exitPromise: Promise<void>;
private _resolveExitPromise!: () => void;

constructor(options: WorkerOptions) {
this._options = options;
this._fakeStream = null;

this._request = null;
this._stderr = null;

this._fakeStream = null;
this._stdout = null;
this._stderr = null;

this._exitPromise = new Promise(resolve => {
this._resolveExitPromise = resolve;
});

this.initialize();
}
Expand Down Expand Up @@ -96,8 +107,8 @@ export default class ChildProcessWorker implements WorkerInterface {
this._stderr.add(child.stderr);
}

child.on('message', this.onMessage.bind(this));
child.on('exit', this.onExit.bind(this));
child.on('message', this._onMessage.bind(this));
child.on('exit', this._onExit.bind(this));

child.send([
CHILD_MESSAGE_INITIALIZE,
Expand All @@ -116,7 +127,7 @@ export default class ChildProcessWorker implements WorkerInterface {
if (this._retries > this._options.maxRetries) {
const error = new Error('Call retries were exceeded');

this.onMessage([
this._onMessage([
PARENT_MESSAGE_CLIENT_ERROR,
error.name,
error.message,
Expand All @@ -132,9 +143,11 @@ export default class ChildProcessWorker implements WorkerInterface {
this._fakeStream.end();
this._fakeStream = null;
}

this._resolveExitPromise();
}

onMessage(response: ParentMessage) {
private _onMessage(response: ParentMessage) {
let error;

switch (response[0]) {
Expand Down Expand Up @@ -179,7 +192,7 @@ export default class ChildProcessWorker implements WorkerInterface {
}
}

onExit(exitCode: number) {
private _onExit(exitCode: number) {
if (exitCode !== 0) {
this.initialize();

Expand All @@ -205,6 +218,10 @@ export default class ChildProcessWorker implements WorkerInterface {
this._child.send(request);
}

waitForExit() {
return this._exitPromise;
}

getWorkerId() {
return this._options.workerId;
}
Expand Down

0 comments on commit 47c8cea

Please sign in to comment.