Skip to content

Commit

Permalink
Prevent worker stdout/stderr from ending when the first stream ends
Browse files Browse the repository at this point in the history
  • Loading branch information
rubennorte committed Mar 8, 2019
1 parent 01332e8 commit fc4025b
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 4 deletions.
28 changes: 26 additions & 2 deletions packages/jest-worker/src/workers/ChildProcessWorker.ts
Expand Up @@ -6,6 +6,7 @@
*/

import childProcess, {ChildProcess} from 'child_process';
import {PassThrough} from 'stream';
import mergeStream from 'merge-stream';
import supportsColor from 'supports-color';

Expand Down Expand Up @@ -44,13 +45,15 @@ export default class ChildProcessWorker implements WorkerInterface {
private _child!: ChildProcess;
private _options: WorkerOptions;
private _onProcessEnd!: OnEnd;
private _fakeStream: PassThrough | null;
private _request: ChildMessage | null;
private _retries!: number;
private _stderr: ReturnType<typeof mergeStream> | null;
private _stdout: ReturnType<typeof mergeStream> | null;

constructor(options: WorkerOptions) {
this._options = options;
this._fakeStream = null;
this._request = null;
this._stderr = null;
this._stdout = null;
Expand All @@ -75,15 +78,19 @@ export default class ChildProcessWorker implements WorkerInterface {

if (child.stdout) {
if (!this._stdout) {
this._stdout = mergeStream();
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stdout = mergeStream(this._getFakeStream());
}

this._stdout.add(child.stdout);
}

if (child.stderr) {
if (!this._stderr) {
this._stderr = mergeStream();
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stderr = mergeStream(this._getFakeStream());
}

this._stderr.add(child.stderr);
Expand Down Expand Up @@ -119,6 +126,14 @@ export default class ChildProcessWorker implements WorkerInterface {
}
}

private _shutdown() {
// End the temporary streams so the merged streams end too
if (this._fakeStream) {
this._fakeStream.end();
this._fakeStream = null;
}
}

onMessage(response: ParentMessage) {
let error;

Expand Down Expand Up @@ -171,6 +186,8 @@ export default class ChildProcessWorker implements WorkerInterface {
if (this._request) {
this._child.send(this._request);
}
} else {
this._shutdown();
}
}

Expand Down Expand Up @@ -199,4 +216,11 @@ export default class ChildProcessWorker implements WorkerInterface {
getStderr(): NodeJS.ReadableStream | null {
return this._stderr;
}

private _getFakeStream() {
if (!this._fakeStream) {
this._fakeStream = new PassThrough();
}
return this._fakeStream;
}
}
28 changes: 26 additions & 2 deletions packages/jest-worker/src/workers/NodeThreadsWorker.ts
Expand Up @@ -6,6 +6,7 @@
*/

import path from 'path';
import {PassThrough} from 'stream';
// ESLint doesn't know about this experimental module
// eslint-disable-next-line import/no-unresolved
import {Worker} from 'worker_threads';
Expand All @@ -32,12 +33,14 @@ export default class ExperimentalWorker implements WorkerInterface {
private _retries!: number;
private _stderr: ReturnType<typeof mergeStream> | null;
private _stdout: ReturnType<typeof mergeStream> | null;
private _fakeStream: PassThrough | null;

constructor(options: WorkerOptions) {
this._options = options;
this._request = null;
this._stderr = null;
this._stdout = null;
this._fakeStream = null;

this.initialize();
}
Expand All @@ -62,15 +65,19 @@ export default class ExperimentalWorker implements WorkerInterface {

if (this._worker.stdout) {
if (!this._stdout) {
this._stdout = mergeStream();
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stdout = mergeStream(this._getFakeStream());
}

this._stdout.add(this._worker.stdout);
}

if (this._worker.stderr) {
if (!this._stderr) {
this._stderr = mergeStream();
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stderr = mergeStream(this._getFakeStream());
}

this._stderr.add(this._worker.stderr);
Expand Down Expand Up @@ -104,6 +111,14 @@ export default class ExperimentalWorker implements WorkerInterface {
}
}

private _shutdown() {
// End the permanent stream so the merged stream end too
if (this._fakeStream) {
this._fakeStream.end();
this._fakeStream = null;
}
}

onMessage(response: ParentMessage) {
let error;

Expand Down Expand Up @@ -154,6 +169,8 @@ export default class ExperimentalWorker implements WorkerInterface {
if (this._request) {
this._worker.postMessage(this._request);
}
} else {
this._shutdown();
}
}

Expand Down Expand Up @@ -183,4 +200,11 @@ export default class ExperimentalWorker implements WorkerInterface {
getStderr(): NodeJS.ReadableStream | null {
return this._stderr;
}

private _getFakeStream() {
if (!this._fakeStream) {
this._fakeStream = new PassThrough();
}
return this._fakeStream;
}
}
Expand Up @@ -141,6 +141,7 @@ it('provides stdout and stderr from the child processes', async () => {
forkInterface.emit('exit');
forkInterface.stdout.end('World!', {encoding: 'utf8'});
forkInterface.stderr.end('Workers!', {encoding: 'utf8'});
forkInterface.emit('exit', 0);

await expect(getStream(stdout)).resolves.toEqual('Hello World!');
await expect(getStream(stderr)).resolves.toEqual('Jest Workers!');
Expand Down
Expand Up @@ -153,6 +153,7 @@ it('provides stdout and stderr from the child processes', async () => {
worker._worker.emit('exit');
worker._worker.stdout.end('World!', {encoding: 'utf8'});
worker._worker.stderr.end('Workers!', {encoding: 'utf8'});
worker._worker.emit('exit', 0);

await expect(getStream(stdout)).resolves.toEqual('Hello World!');
await expect(getStream(stderr)).resolves.toEqual('Jest Workers!');
Expand Down

0 comments on commit fc4025b

Please sign in to comment.