Skip to content

Commit

Permalink
Fix retries and error notification in workers (#8079)
Browse files Browse the repository at this point in the history
* Add tests that show that retries are broken in jest-worker

* Re-send requests to workers after they're re-created to retry

* Refactor request handling in workers

* Add changelog entry

* Fix lint issues

* Improve e2e test
  • Loading branch information
rubennorte committed Mar 7, 2019
1 parent b0cbcbf commit 0c9737e
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,7 @@
### Fixes

- `[jest-cli]` export functions compatible with `import {default}` ([#8080](https://github.com/facebook/jest/pull/8080))
- `[jest-worker]`: Fix retries and error notification in workers ([#8079](https://github.com/facebook/jest/pull/8079))

### Chore & Maintenance

Expand Down
48 changes: 48 additions & 0 deletions e2e/__tests__/fatalWorkerError.test.ts
@@ -0,0 +1,48 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

import path from 'path';
import os from 'os';
import {cleanup, writeFiles} from '../Utils';
import runJest from '../runJest';

const DIR = path.resolve(os.tmpdir(), 'fatal-worker-error');

beforeEach(() => cleanup(DIR));
afterAll(() => cleanup(DIR));

const NUMBER_OF_TESTS_TO_FORCE_USING_WORKERS = 25;

test('fails a test that terminates the worker with a fatal error', () => {
const testFiles = {
'__tests__/fatalWorkerError.test.js': `
test('fatal worker error', () => {
process.exit(134);
});
`,
};

for (let i = 0; i <= NUMBER_OF_TESTS_TO_FORCE_USING_WORKERS; i++) {
testFiles[`__tests__/test${i}.test.js`] = `
test('test ${i}', () => {});
`;
}

writeFiles(DIR, {
...testFiles,
'package.json': '{}',
});

const {status, stderr} = runJest(DIR, ['--maxWorkers=2']);

const numberOfTestsPassed = (stderr.match(/\bPASS\b/g) || []).length;

expect(status).not.toBe(0);
expect(numberOfTestsPassed).toBe(Object.keys(testFiles).length - 1);
expect(stderr).toContain('FAIL __tests__/fatalWorkerError.test.js');
expect(stderr).toContain('Call retries were exceeded');
});
17 changes: 15 additions & 2 deletions packages/jest-worker/src/workers/ChildProcessWorker.ts
Expand Up @@ -43,10 +43,13 @@ export default class ChildProcessWorker implements WorkerInterface {
private _child!: ChildProcess;
private _options: WorkerOptions;
private _onProcessEnd!: OnEnd;
private _request: ChildMessage | null;
private _retries!: number;

constructor(options: WorkerOptions) {
this._options = options;
this._request = null;

this.initialize();
}

Expand Down Expand Up @@ -142,13 +145,23 @@ export default class ChildProcessWorker implements WorkerInterface {
onExit(exitCode: number) {
if (exitCode !== 0) {
this.initialize();

if (this._request) {
this._child.send(this._request);
}
}
}

send(request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd) {
onProcessStart(this);
this._onProcessEnd = onProcessEnd;

this._onProcessEnd = (...args) => {
// Clean the request to avoid sending past requests to workers that fail
// while waiting for a new request (timers, unhandled rejections...)
this._request = null;
return onProcessEnd(...args);
};

this._request = request;
this._retries = 0;
this._child.send(request);
}
Expand Down
16 changes: 14 additions & 2 deletions packages/jest-worker/src/workers/NodeThreadsWorker.ts
Expand Up @@ -27,10 +27,12 @@ export default class ExperimentalWorker implements WorkerInterface {
private _worker!: Worker;
private _options: WorkerOptions;
private _onProcessEnd!: OnEnd;
private _request: ChildMessage | null;
private _retries!: number;

constructor(options: WorkerOptions) {
this._options = options;
this._request = null;
this.initialize();
}

Expand Down Expand Up @@ -126,13 +128,23 @@ export default class ExperimentalWorker implements WorkerInterface {
onExit(exitCode: number) {
if (exitCode !== 0) {
this.initialize();

if (this._request) {
this._worker.postMessage(this._request);
}
}
}

send(request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd) {
onProcessStart(this);
this._onProcessEnd = onProcessEnd;

this._onProcessEnd = (...args) => {
// Clean the request to avoid sending past requests to workers that fail
// while waiting for a new request (timers, unhandled rejections...)
this._request = null;
return onProcessEnd(...args);
};

this._request = request;
this._retries = 0;

this._worker.postMessage(request);
Expand Down
Expand Up @@ -151,6 +151,29 @@ it('sends the task to the child process', () => {
expect(forkInterface.send.mock.calls[1][0]).toEqual(request);
});

it('resends the task to the child process after a retry', () => {
const worker = new Worker({
forkOptions: {},
maxRetries: 3,
workerPath: '/tmp/foo/bar/baz.js',
});

const request = [CHILD_MESSAGE_CALL, false, 'foo', []];

worker.send(request, () => {}, () => {});

// Skipping call "0" because it corresponds to the "initialize" one.
expect(forkInterface.send.mock.calls[1][0]).toEqual(request);

const previousForkInterface = forkInterface;
forkInterface.emit('exit');

expect(forkInterface).not.toBe(previousForkInterface);

// Skipping call "0" because it corresponds to the "initialize" one.
expect(forkInterface.send.mock.calls[1][0]).toEqual(request);
});

it('calls the onProcessStart method synchronously if the queue is empty', () => {
const worker = new Worker({
forkOptions: {},
Expand Down
Expand Up @@ -160,6 +160,29 @@ it('sends the task to the child process', () => {
expect(worker._worker.postMessage.mock.calls[1][0]).toEqual(request);
});

it('resends the task to the child process after a retry', () => {
const worker = new Worker({
forkOptions: {},
maxRetries: 3,
workerPath: '/tmp/foo/bar/baz.js',
});

const request = [CHILD_MESSAGE_CALL, false, 'foo', []];

worker.send(request, () => {}, () => {});

// Skipping call "0" because it corresponds to the "initialize" one.
expect(worker._worker.postMessage.mock.calls[1][0]).toEqual(request);

const previousWorker = worker._worker;
worker._worker.emit('exit');

expect(worker._worker).not.toBe(previousWorker);

// Skipping call "0" because it corresponds to the "initialize" one.
expect(worker._worker.postMessage.mock.calls[1][0]).toEqual(request);
});

it('calls the onProcessStart method synchronously if the queue is empty', () => {
const worker = new Worker({
forkOptions: {},
Expand Down

0 comments on commit 0c9737e

Please sign in to comment.