Skip to content

Commit

Permalink
Re-attach stdout and stderr from new processes after retries (#8087)
Browse files Browse the repository at this point in the history
* Reattach stdout and stderr from new processes after retries

* Add changelog entry

* Add minor style corrections

* Prevent worker stdout/stderr from ending when the first stream ends
  • Loading branch information
rubennorte committed Mar 8, 2019
1 parent a5ae2f5 commit 95535e0
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,7 @@

- `[expect]` Compare DOM nodes even if there are multiple Node classes ([#8064](https://github.com/facebook/jest/pull/8064))
- `[jest-worker]` `worker.getStdout()` can return `null` ([#8083](https://github.com/facebook/jest/pull/8083))
- `[jest-worker]` Re-attach stdout and stderr from new processes/threads created after retries ([#8087](https://github.com/facebook/jest/pull/8087))

### Chore & Maintenance

Expand Down
1 change: 1 addition & 0 deletions packages/jest-worker/package.json
Expand Up @@ -17,6 +17,7 @@
"devDependencies": {
"@types/merge-stream": "^1.1.2",
"@types/supports-color": "^5.3.0",
"get-stream": "^4.1.0",
"worker-farm": "^1.6.0"
},
"engines": {
Expand Down
54 changes: 50 additions & 4 deletions packages/jest-worker/src/workers/ChildProcessWorker.ts
Expand Up @@ -6,6 +6,8 @@
*/

import childProcess, {ChildProcess} from 'child_process';
import {PassThrough} from 'stream';
import mergeStream from 'merge-stream';
import supportsColor from 'supports-color';

import {
Expand Down Expand Up @@ -43,12 +45,18 @@ export default class ChildProcessWorker implements WorkerInterface {
private _child!: ChildProcess;
private _options: WorkerOptions;
private _onProcessEnd!: OnEnd;
private _fakeStream: PassThrough | null;
private _request: ChildMessage | null;
private _retries!: number;
private _stderr: ReturnType<typeof mergeStream> | null;
private _stdout: ReturnType<typeof mergeStream> | null;

constructor(options: WorkerOptions) {
this._options = options;
this._fakeStream = null;
this._request = null;
this._stderr = null;
this._stdout = null;

this.initialize();
}
Expand All @@ -68,6 +76,26 @@ export default class ChildProcessWorker implements WorkerInterface {
...this._options.forkOptions,
});

if (child.stdout) {
if (!this._stdout) {
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stdout = mergeStream(this._getFakeStream());
}

this._stdout.add(child.stdout);
}

if (child.stderr) {
if (!this._stderr) {
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stderr = mergeStream(this._getFakeStream());
}

this._stderr.add(child.stderr);
}

child.on('message', this.onMessage.bind(this));
child.on('exit', this.onExit.bind(this));

Expand All @@ -79,6 +107,7 @@ export default class ChildProcessWorker implements WorkerInterface {
]);

this._child = child;

this._retries++;

// If we exceeded the amount of retries, we will emulate an error reply
Expand All @@ -97,6 +126,14 @@ export default class ChildProcessWorker implements WorkerInterface {
}
}

private _shutdown() {
// End the temporary streams so the merged streams end too
if (this._fakeStream) {
this._fakeStream.end();
this._fakeStream = null;
}
}

onMessage(response: ParentMessage) {
let error;

Expand Down Expand Up @@ -149,6 +186,8 @@ export default class ChildProcessWorker implements WorkerInterface {
if (this._request) {
this._child.send(this._request);
}
} else {
this._shutdown();
}
}

Expand All @@ -170,11 +209,18 @@ export default class ChildProcessWorker implements WorkerInterface {
return this._options.workerId;
}

getStdout() {
return this._child.stdout;
getStdout(): NodeJS.ReadableStream | null {
return this._stdout;
}

getStderr() {
return this._child.stderr;
getStderr(): NodeJS.ReadableStream | null {
return this._stderr;
}

private _getFakeStream() {
if (!this._fakeStream) {
this._fakeStream = new PassThrough();
}
return this._fakeStream;
}
}
54 changes: 50 additions & 4 deletions packages/jest-worker/src/workers/NodeThreadsWorker.ts
Expand Up @@ -6,9 +6,11 @@
*/

import path from 'path';
import {PassThrough} from 'stream';
// ESLint doesn't know about this experimental module
// eslint-disable-next-line import/no-unresolved
import {Worker} from 'worker_threads';
import mergeStream from 'merge-stream';

import {
CHILD_MESSAGE_INITIALIZE,
Expand All @@ -29,10 +31,17 @@ export default class ExperimentalWorker implements WorkerInterface {
private _onProcessEnd!: OnEnd;
private _request: ChildMessage | null;
private _retries!: number;
private _stderr: ReturnType<typeof mergeStream> | null;
private _stdout: ReturnType<typeof mergeStream> | null;
private _fakeStream: PassThrough | null;

constructor(options: WorkerOptions) {
this._options = options;
this._request = null;
this._stderr = null;
this._stdout = null;
this._fakeStream = null;

this.initialize();
}

Expand All @@ -54,6 +63,26 @@ export default class ExperimentalWorker implements WorkerInterface {
},
});

if (this._worker.stdout) {
if (!this._stdout) {
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stdout = mergeStream(this._getFakeStream());
}

this._stdout.add(this._worker.stdout);
}

if (this._worker.stderr) {
if (!this._stderr) {
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stderr = mergeStream(this._getFakeStream());
}

this._stderr.add(this._worker.stderr);
}

this._worker.on('message', this.onMessage.bind(this));
this._worker.on('exit', this.onExit.bind(this));

Expand Down Expand Up @@ -82,6 +111,14 @@ export default class ExperimentalWorker implements WorkerInterface {
}
}

private _shutdown() {
// End the permanent stream so the merged stream end too
if (this._fakeStream) {
this._fakeStream.end();
this._fakeStream = null;
}
}

onMessage(response: ParentMessage) {
let error;

Expand Down Expand Up @@ -132,6 +169,8 @@ export default class ExperimentalWorker implements WorkerInterface {
if (this._request) {
this._worker.postMessage(this._request);
}
} else {
this._shutdown();
}
}

Expand All @@ -154,11 +193,18 @@ export default class ExperimentalWorker implements WorkerInterface {
return this._options.workerId;
}

getStdout() {
return this._worker.stdout;
getStdout(): NodeJS.ReadableStream | null {
return this._stdout;
}

getStderr() {
return this._worker.stderr;
getStderr(): NodeJS.ReadableStream | null {
return this._stderr;
}

private _getFakeStream() {
if (!this._fakeStream) {
this._fakeStream = new PassThrough();
}
return this._fakeStream;
}
}
Expand Up @@ -9,6 +9,8 @@

import EventEmitter from 'events';
import supportsColor from 'supports-color';
import getStream from 'get-stream';
import {PassThrough} from 'stream';

import {
CHILD_MESSAGE_CALL,
Expand All @@ -30,8 +32,8 @@ beforeEach(() => {
childProcess.fork.mockImplementation(() => {
forkInterface = Object.assign(new EventEmitter(), {
send: jest.fn(),
stderr: {},
stdout: {},
stderr: new PassThrough(),
stdout: new PassThrough(),
});

return forkInterface;
Expand Down Expand Up @@ -124,15 +126,25 @@ it('stops initializing the worker after the amount of retries is exceeded', () =
expect(onProcessEnd.mock.calls[0][1]).toBe(null);
});

it('provides stdout and stderr fields from the child process', () => {
it('provides stdout and stderr from the child processes', async () => {
const worker = new Worker({
forkOptions: {},
maxRetries: 3,
workerPath: '/tmp/foo',
});

expect(worker.getStdout()).toBe(forkInterface.stdout);
expect(worker.getStderr()).toBe(forkInterface.stderr);
const stdout = worker.getStdout();
const stderr = worker.getStderr();

forkInterface.stdout.end('Hello ', {encoding: 'utf8'});
forkInterface.stderr.end('Jest ', {encoding: 'utf8'});
forkInterface.emit('exit');
forkInterface.stdout.end('World!', {encoding: 'utf8'});
forkInterface.stderr.end('Workers!', {encoding: 'utf8'});
forkInterface.emit('exit', 0);

await expect(getStream(stdout)).resolves.toEqual('Hello World!');
await expect(getStream(stderr)).resolves.toEqual('Jest Workers!');
});

it('sends the task to the child process', () => {
Expand Down
Expand Up @@ -9,6 +9,8 @@

/* eslint-disable no-new */

import getStream from 'get-stream';

import {
CHILD_MESSAGE_CALL,
CHILD_MESSAGE_INITIALIZE,
Expand All @@ -24,10 +26,12 @@ beforeEach(() => {
jest.mock('worker_threads', () => {
const fakeClass = jest.fn(() => {
const EventEmitter = require('events');
const {PassThrough} = require('stream');

const thread = new EventEmitter();
thread.postMessage = jest.fn();
thread.stdout = 'stdout';
thread.stderr = 'stderr';
thread.stdout = new PassThrough();
thread.stderr = new PassThrough();
return thread;
});

Expand Down Expand Up @@ -134,15 +138,25 @@ it('stops initializing the worker after the amount of retries is exceeded', () =
expect(onProcessEnd.mock.calls[0][1]).toBe(null);
});

it('provides stdout and stderr fields from the child process', () => {
it('provides stdout and stderr from the child processes', async () => {
const worker = new Worker({
forkOptions: {},
maxRetries: 3,
workerPath: '/tmp/foo',
});

expect(worker.getStdout()).toBe('stdout');
expect(worker.getStderr()).toBe('stderr');
const stdout = worker.getStdout();
const stderr = worker.getStderr();

worker._worker.stdout.end('Hello ', {encoding: 'utf8'});
worker._worker.stderr.end('Jest ', {encoding: 'utf8'});
worker._worker.emit('exit');
worker._worker.stdout.end('World!', {encoding: 'utf8'});
worker._worker.stderr.end('Workers!', {encoding: 'utf8'});
worker._worker.emit('exit', 0);

await expect(getStream(stdout)).resolves.toEqual('Hello World!');
await expect(getStream(stderr)).resolves.toEqual('Jest Workers!');
});

it('sends the task to the child process', () => {
Expand Down

0 comments on commit 95535e0

Please sign in to comment.