diff --git a/CHANGELOG.md b/CHANGELOG.md index a95862c27536..49322624ffe0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,9 @@ - `[babel-plugin-jest-hoist]` Show codeframe on static hoisting issues ([#8865](https://github.com/facebook/jest/pull/8865)) - `[jest-config]` [**BREAKING**] Set default display name color based on runner ([#8689](https://github.com/facebook/jest/pull/8689)) - `[jest-diff]` Add options for colors and symbols ([#8841](https://github.com/facebook/jest/pull/8841)) +- `[jest-runner]` Warn if a worker had to be force exited ([#8206](https://github.com/facebook/jest/pull/8206)) - `[@jest/test-result]` Create method to create empty `TestResult` ([#8867](https://github.com/facebook/jest/pull/8867)) +- `[jest-worker]` [**BREAKING**] Return a promise from `end()`, resolving with the information whether workers exited gracefully ([#8206](https://github.com/facebook/jest/pull/8206)) ### Fixes diff --git a/e2e/Utils.ts b/e2e/Utils.ts index 07523a1401c1..573bf3d3fd2a 100644 --- a/e2e/Utils.ts +++ b/e2e/Utils.ts @@ -90,6 +90,21 @@ export const writeFiles = ( }); }; +const NUMBER_OF_TESTS_TO_FORCE_USING_WORKERS = 25; +/** + * Forces Jest to use workers by generating many test files to run. + * Slow and modifies the test output. Use sparingly. + */ +export const generateTestFilesToForceUsingWorkers = () => { + const testFiles = {}; + for (let i = 0; i <= NUMBER_OF_TESTS_TO_FORCE_USING_WORKERS; i++) { + testFiles[`__tests__/test${i}.test.js`] = ` + test.todo('test ${i}'); + `; + } + return testFiles; +}; + export const copyDir = (src: string, dest: string) => { const srcStat = fs.lstatSync(src); if (srcStat.isDirectory()) { diff --git a/e2e/__tests__/fatalWorkerError.test.ts b/e2e/__tests__/fatalWorkerError.test.ts index 59f1ed080c04..4085a9192e97 100644 --- a/e2e/__tests__/fatalWorkerError.test.ts +++ b/e2e/__tests__/fatalWorkerError.test.ts @@ -7,7 +7,11 @@ import * as path from 'path'; import {tmpdir} from 'os'; -import {cleanup, writeFiles} from '../Utils'; +import { + cleanup, + generateTestFilesToForceUsingWorkers, + writeFiles, +} from '../Utils'; import runJest from '../runJest'; const DIR = path.resolve(tmpdir(), 'fatal-worker-error'); @@ -15,10 +19,9 @@ const DIR = path.resolve(tmpdir(), 'fatal-worker-error'); beforeEach(() => cleanup(DIR)); afterAll(() => cleanup(DIR)); -const NUMBER_OF_TESTS_TO_FORCE_USING_WORKERS = 25; - test('fails a test that terminates the worker with a fatal error', () => { const testFiles = { + ...generateTestFilesToForceUsingWorkers(), '__tests__/fatalWorkerError.test.js': ` test('fatal worker error', () => { process.exit(134); @@ -26,12 +29,6 @@ test('fails a test that terminates the worker with a fatal error', () => { `, }; - for (let i = 0; i <= NUMBER_OF_TESTS_TO_FORCE_USING_WORKERS; i++) { - testFiles[`__tests__/test${i}.test.js`] = ` - test('test ${i}', () => {}); - `; - } - writeFiles(DIR, { ...testFiles, 'package.json': '{}', diff --git a/e2e/__tests__/workerForceExit.test.ts b/e2e/__tests__/workerForceExit.test.ts new file mode 100644 index 000000000000..87bc44c09598 --- /dev/null +++ b/e2e/__tests__/workerForceExit.test.ts @@ -0,0 +1,69 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import {tmpdir} from 'os'; +import {resolve} from 'path'; +import findProcess from 'find-process'; + +import { + cleanup, + generateTestFilesToForceUsingWorkers, + writeFiles, +} from '../Utils'; +import runJest from '../runJest'; + +const DIR = resolve(tmpdir(), 'worker-force-exit'); + +beforeEach(() => cleanup(DIR)); +afterEach(() => cleanup(DIR)); +const testFiles = { + ...generateTestFilesToForceUsingWorkers(), + 'package.json': `{ + "testEnvironment": "node" + }`, +}; + +const verifyNumPassed = stderr => { + const numberOfTestsPassed = (stderr.match(/\bPASS\b/g) || []).length; + // assuming -1 because of package.json, but +1 because of the individual test file + expect(numberOfTestsPassed).toBe(Object.keys(testFiles).length); +}; + +test('prints a warning if a worker is force exited', () => { + writeFiles(DIR, { + ...testFiles, + '__tests__/simple.test.js': ` + test('t', () => { + require('http').createServer().listen(0); + }); + `, + }); + const {status, stderr, stdout} = runJest(DIR, ['--maxWorkers=2']); + + expect(status).toBe(0); + verifyNumPassed(stderr); + expect(stdout).toContain('A worker process has failed to exit gracefully'); +}); + +test('force exits a worker that fails to exit gracefully', async () => { + writeFiles(DIR, { + ...testFiles, + '__tests__/timeoutKilled.test.js': ` + test('t', () => { + require('http').createServer().listen(0); + console.error('pid: ' + process.pid); + }); + `, + }); + const {status, stderr} = runJest(DIR, ['--maxWorkers=2']); + + expect(status).toBe(0); + verifyNumPassed(stderr); + + const [pid] = /pid: \d+/.exec(stderr); + expect(await findProcess('pid', pid)).toHaveLength(0); +}); diff --git a/package.json b/package.json index c595179799a3..2fff8137dc90 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "eslint-plugin-react": "^7.1.0", "execa": "^2.0.4", "fast-check": "^1.13.0", + "find-process": "^1.4.1", "glob": "^7.1.1", "graceful-fs": "^4.1.15", "isbinaryfile": "^4.0.0", diff --git a/packages/jest-runner/src/__tests__/testRunner.test.ts b/packages/jest-runner/src/__tests__/testRunner.test.ts index 6a13db1b9c09..8853a1c8b9bd 100644 --- a/packages/jest-runner/src/__tests__/testRunner.test.ts +++ b/packages/jest-runner/src/__tests__/testRunner.test.ts @@ -15,7 +15,7 @@ jest.mock('jest-worker', () => jest.fn( worker => (mockWorkerFarm = { - end: jest.fn(), + end: jest.fn().mockResolvedValue({forceExited: false}), getStderr: jest.fn(), getStdout: jest.fn(), worker: jest.fn((data, callback) => require(worker)(data, callback)), diff --git a/packages/jest-runner/src/index.ts b/packages/jest-runner/src/index.ts index b57ad2ab3204..e838e63e06b6 100644 --- a/packages/jest-runner/src/index.ts +++ b/packages/jest-runner/src/index.ts @@ -8,6 +8,7 @@ import {Config} from '@jest/types'; import {SerializableError} from '@jest/test-result'; import exit = require('exit'); +import chalk from 'chalk'; import throat from 'throat'; import Worker from 'jest-worker'; import runTest from './runTest'; @@ -189,7 +190,18 @@ class TestRunner { ), ); - const cleanup = () => worker.end(); + const cleanup = async () => { + const {forceExited} = await worker.end(); + if (forceExited) { + console.log( + chalk.yellow( + 'A worker process has failed to exit gracefully and has been force exited. ' + + 'This is likely caused by tests leaking due to improper teardown. ' + + 'Try running with --runInBand --detectOpenHandles to find leaks.', + ), + ); + } + }; return Promise.race([runAllTests, onInterrupt]).then(cleanup, cleanup); } } diff --git a/packages/jest-worker/README.md b/packages/jest-worker/README.md index e6ca3a798d10..932326202edb 100644 --- a/packages/jest-worker/README.md +++ b/packages/jest-worker/README.md @@ -109,6 +109,8 @@ Returns a `ReadableStream` where the standard error of all workers is piped. Not Finishes the workers by killing all workers. No further calls can be done to the `Worker` instance. +Returns a Promise that resolves with `{ forceExited: boolean }` once all workers are dead. If `forceExited` is `true`, at least one of the workers did not exit gracefully, which likely happened because it executed a leaky task that left handles open. This should be avoided, force exiting workers is a last resort to prevent creating lots of orphans. + **Note:** Each worker has a unique id (index that starts with `1`) which is available on `process.env.JEST_WORKER_ID` ## Setting up and tearing down the child process @@ -139,7 +141,10 @@ async function main() { console.log(await myWorker.bar('Bob')); // "Hello from bar: Bob" console.log(await myWorker.getWorkerId()); // "3" -> this message has sent from the 3rd worker - myWorker.end(); + const {forceExited} = await myWorker.end(); + if (forceExited) { + console.error('Workers failed to exit gracefully'); + } } main(); @@ -186,7 +191,10 @@ async function main() { // the same worker that processed the file the first time will process it now. console.log(await myWorker.transform('/tmp/foo.js')); - myWorker.end(); + const {forceExited} = await myWorker.end(); + if (forceExited) { + console.error('Workers failed to exit gracefully'); + } } main(); diff --git a/packages/jest-worker/src/__tests__/index.test.js b/packages/jest-worker/src/__tests__/index.test.js index eae8116f2e1f..5fbd5293fef5 100644 --- a/packages/jest-worker/src/__tests__/index.test.js +++ b/packages/jest-worker/src/__tests__/index.test.js @@ -133,7 +133,7 @@ it('does not let make calls after the farm is ended', () => { ); }); -it('does not let end the farm after it is ended', () => { +it('does not let end the farm after it is ended', async () => { const farm = new Farm('/tmp/baz.js', { exposedMethods: ['foo', 'bar'], numWorkers: 4, @@ -141,10 +141,10 @@ it('does not let end the farm after it is ended', () => { farm.end(); expect(farm._workerPool.end).toHaveBeenCalledTimes(1); - expect(() => farm.end()).toThrow( + await expect(farm.end()).rejects.toThrow( 'Farm is ended, no more calls can be done to it', ); - expect(() => farm.end()).toThrow( + await expect(farm.end()).rejects.toThrow( 'Farm is ended, no more calls can be done to it', ); expect(farm._workerPool.end).toHaveBeenCalledTimes(1); diff --git a/packages/jest-worker/src/base/BaseWorkerPool.ts b/packages/jest-worker/src/base/BaseWorkerPool.ts index 62fa3427ed3d..5eec43c0214c 100644 --- a/packages/jest-worker/src/base/BaseWorkerPool.ts +++ b/packages/jest-worker/src/base/BaseWorkerPool.ts @@ -10,11 +10,16 @@ import mergeStream = require('merge-stream'); import { CHILD_MESSAGE_END, + PoolExitResult, WorkerInterface, WorkerOptions, WorkerPoolOptions, } from '../types'; +// How long to wait for the child process to terminate +// after CHILD_MESSAGE_END before sending force exiting. +const FORCE_EXIT_DELAY = 500; + /* istanbul ignore next */ const emptyMethod = () => {}; @@ -85,15 +90,33 @@ export default class BaseWorkerPool { throw Error('Missing method createWorker in WorkerPool'); } - end(): void { + async end(): Promise { // We do not cache the request object here. If so, it would only be only // processed by one of the workers, and we want them all to close. - for (let i = 0; i < this._workers.length; i++) { - this._workers[i].send( - [CHILD_MESSAGE_END, false], - emptyMethod, - emptyMethod, - ); - } + const workerExitPromises = this._workers.map(async worker => { + worker.send([CHILD_MESSAGE_END, false], emptyMethod, emptyMethod); + + // Schedule a force exit in case worker fails to exit gracefully so + // await worker.waitForExit() never takes longer than FORCE_EXIT_DELAY + let forceExited = false; + const forceExitTimeout = setTimeout(() => { + worker.forceExit(); + forceExited = true; + }, FORCE_EXIT_DELAY); + + await worker.waitForExit(); + // Worker ideally exited gracefully, don't send force exit then + clearTimeout(forceExitTimeout); + + return forceExited; + }); + + const workerExits = await Promise.all(workerExitPromises); + return workerExits.reduce( + (result, forceExited) => ({ + forceExited: result.forceExited || forceExited, + }), + {forceExited: false}, + ); } } diff --git a/packages/jest-worker/src/base/__tests__/BaseWorkerPool.test.js b/packages/jest-worker/src/base/__tests__/BaseWorkerPool.test.js index ef034b5de8c7..c249c0a2bd2c 100644 --- a/packages/jest-worker/src/base/__tests__/BaseWorkerPool.test.js +++ b/packages/jest-worker/src/base/__tests__/BaseWorkerPool.test.js @@ -11,11 +11,7 @@ import {CHILD_MESSAGE_END} from '../../types'; import BaseWorkerPool from '../BaseWorkerPool'; -const Worker = jest.fn(() => ({ - getStderr: () => ({once() {}, pipe() {}}), - getStdout: () => ({once() {}, pipe() {}}), - send: jest.fn(), -})); +const Worker = jest.fn(); const mockSend = jest.fn(); @@ -31,6 +27,13 @@ class MockWorkerPool extends BaseWorkerPool { describe('BaseWorkerPool', () => { beforeEach(() => { Worker.mockClear(); + Worker.mockImplementation(() => ({ + forceExit: jest.fn(), + getStderr: () => ({once() {}, pipe() {}}), + getStdout: () => ({once() {}, pipe() {}}), + send: jest.fn(), + waitForExit: () => Promise.resolve(), + })); }); it('throws error when createWorker is not defined', () => { @@ -60,24 +63,6 @@ describe('BaseWorkerPool', () => { expect(pool.getWorkerById(3)).toBeDefined(); }); - it('ends all workers', () => { - const pool = new MockWorkerPool('/tmp/baz.js', { - forkOptions: {execArgv: []}, - maxRetries: 6, - numWorkers: 4, - setupArgs: [], - }); - - const workers = pool.getWorkers(); - pool.end(); - - const endMessage = [CHILD_MESSAGE_END, false]; - expect(workers[0].send.mock.calls[0][0]).toEqual(endMessage); - expect(workers[1].send.mock.calls[0][0]).toEqual(endMessage); - expect(workers[2].send.mock.calls[0][0]).toEqual(endMessage); - expect(workers[3].send.mock.calls[0][0]).toEqual(endMessage); - }); - it('creates and expoeses n workers', () => { const pool = new MockWorkerPool('/tmp/baz.js', { forkOptions: {execArgv: []}, @@ -221,4 +206,76 @@ describe('BaseWorkerPool', () => { expect(() => farm.send()).not.toThrow(); expect(() => farm.send()).not.toThrow(); }); + + describe('end', () => { + it('ends all workers', async () => { + const pool = new MockWorkerPool('/tmp/baz.js', { + forkOptions: {execArgv: []}, + maxRetries: 6, + numWorkers: 4, + setupArgs: [], + }); + + const workers = pool.getWorkers(); + await pool.end(); + + const endMessage = [CHILD_MESSAGE_END, false]; + expect(workers[0].send.mock.calls[0][0]).toEqual(endMessage); + expect(workers[1].send.mock.calls[0][0]).toEqual(endMessage); + expect(workers[2].send.mock.calls[0][0]).toEqual(endMessage); + expect(workers[3].send.mock.calls[0][0]).toEqual(endMessage); + }); + + it('resolves with forceExited=false if workers exited gracefully', async () => { + Worker.mockImplementation(() => ({ + forceExit: jest.fn(), + getStderr: () => null, + getStdout: () => null, + send: jest.fn(), + waitForExit: () => Promise.resolve(), + })); + + const pool = new MockWorkerPool('/tmp/baz.js', { + forkOptions: {execArgv: []}, + maxRetries: 6, + numWorkers: 4, + setupArgs: [], + }); + + expect(await pool.end()).toEqual({forceExited: false}); + }); + + it('force exits workers that do not exit gracefully and resolves with forceExited=true', async () => { + // Set it up so that the first worker does not resolve waitForExit immediately, + // but only when forceExit() is called + let worker0Exited; + Worker.mockImplementationOnce(() => ({ + forceExit: () => { + worker0Exited(); + }, + getStderr: () => null, + getStdout: () => null, + send: jest.fn(), + waitForExit: () => new Promise(resolve => (worker0Exited = resolve)), + })).mockImplementation(() => ({ + forceExit: jest.fn(), + getStderr: () => null, + getStdout: () => null, + send: jest.fn(), + waitForExit: () => Promise.resolve(), + })); + + const pool = new MockWorkerPool('/tmp/baz.js', { + forkOptions: {execArgv: []}, + maxRetries: 6, + numWorkers: 2, + setupArgs: [], + }); + + const workers = pool.getWorkers(); + expect(await pool.end()).toEqual({forceExited: true}); + + expect(workers[1].forceExit).not.toHaveBeenCalled(); + }); + }); }); diff --git a/packages/jest-worker/src/index.ts b/packages/jest-worker/src/index.ts index 114d884eb16e..a253e0b489c6 100644 --- a/packages/jest-worker/src/index.ts +++ b/packages/jest-worker/src/index.ts @@ -8,7 +8,12 @@ import {cpus} from 'os'; import WorkerPool from './WorkerPool'; import Farm from './Farm'; -import {FarmOptions, WorkerPoolInterface, WorkerPoolOptions} from './types'; +import { + FarmOptions, + PoolExitResult, + WorkerPoolInterface, + WorkerPoolOptions, +} from './types'; function getExposedMethods( workerPath: string, @@ -132,13 +137,12 @@ export default class JestWorker { return this._workerPool.getStdout(); } - end(): void { + async end(): Promise { if (this._ending) { throw new Error('Farm is ended, no more calls can be done to it'); } - - this._workerPool.end(); - this._ending = true; + + return this._workerPool.end(); } } diff --git a/packages/jest-worker/src/types.ts b/packages/jest-worker/src/types.ts index dc5bafc23835..c61b6303247b 100644 --- a/packages/jest-worker/src/types.ts +++ b/packages/jest-worker/src/types.ts @@ -24,10 +24,6 @@ export type PARENT_MESSAGE_ERROR = | typeof PARENT_MESSAGE_CLIENT_ERROR | typeof PARENT_MESSAGE_SETUP_ERROR; -// Option objects. - -export {ForkOptions}; - export interface WorkerPoolInterface { getStderr(): NodeJS.ReadableStream; getStdout(): NodeJS.ReadableStream; @@ -39,7 +35,7 @@ export interface WorkerPoolInterface { onStart: OnStart, onEnd: OnEnd, ): void; - end(): void; + end(): Promise; } export interface WorkerInterface { @@ -48,13 +44,22 @@ export interface WorkerInterface { onProcessStart: OnStart, onProcessEnd: OnEnd, ): void; + waitForExit(): Promise; + forceExit(): void; + getWorkerId(): number; getStderr(): NodeJS.ReadableStream | null; getStdout(): NodeJS.ReadableStream | null; - onExit(exitCode: number): void; - onMessage(message: ParentMessage): void; } +export type PoolExitResult = { + forceExited: boolean; +}; + +// Option objects. + +export {ForkOptions}; + export type FarmOptions = { computeWorkerKey?: (method: string, ...args: Array) => string | null; exposedMethods?: ReadonlyArray; diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index ce712771042c..a6b3e3ac02f2 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -23,6 +23,13 @@ import { WorkerOptions, } from '../types'; +const SIGNAL_BASE_EXIT_CODE = 128; +const SIGKILL_EXIT_CODE = SIGNAL_BASE_EXIT_CODE + 9; +const SIGTERM_EXIT_CODE = SIGNAL_BASE_EXIT_CODE + 15; + +// How long to wait after SIGTERM before sending SIGKILL +const SIGKILL_DELAY = 500; + /** * This class wraps the child process and provides a nice interface to * communicate with. It takes care of: @@ -44,19 +51,30 @@ import { export default class ChildProcessWorker implements WorkerInterface { private _child!: ChildProcess; private _options: WorkerOptions; - private _onProcessEnd!: OnEnd; - private _fakeStream: PassThrough | null; + private _request: ChildMessage | null; private _retries!: number; - private _stderr: ReturnType | null; + private _onProcessEnd!: OnEnd; + + private _fakeStream: PassThrough | null; private _stdout: ReturnType | null; + private _stderr: ReturnType | null; + + private _exitPromise: Promise; + private _resolveExitPromise!: () => void; constructor(options: WorkerOptions) { this._options = options; - this._fakeStream = null; + this._request = null; - this._stderr = null; + + this._fakeStream = null; this._stdout = null; + this._stderr = null; + + this._exitPromise = new Promise(resolve => { + this._resolveExitPromise = resolve; + }); this.initialize(); } @@ -96,8 +114,8 @@ export default class ChildProcessWorker implements WorkerInterface { this._stderr.add(child.stderr); } - child.on('message', this.onMessage.bind(this)); - child.on('exit', this.onExit.bind(this)); + child.on('message', this._onMessage.bind(this)); + child.on('exit', this._onExit.bind(this)); child.send([ CHILD_MESSAGE_INITIALIZE, @@ -116,7 +134,7 @@ export default class ChildProcessWorker implements WorkerInterface { if (this._retries > this._options.maxRetries) { const error = new Error('Call retries were exceeded'); - this.onMessage([ + this._onMessage([ PARENT_MESSAGE_CLIENT_ERROR, error.name, error.message, @@ -132,9 +150,11 @@ export default class ChildProcessWorker implements WorkerInterface { this._fakeStream.end(); this._fakeStream = null; } + + this._resolveExitPromise(); } - onMessage(response: ParentMessage) { + private _onMessage(response: ParentMessage) { let error; switch (response[0]) { @@ -179,8 +199,12 @@ export default class ChildProcessWorker implements WorkerInterface { } } - onExit(exitCode: number) { - if (exitCode !== 0) { + private _onExit(exitCode: number) { + if ( + exitCode !== 0 && + exitCode !== SIGTERM_EXIT_CODE && + exitCode !== SIGKILL_EXIT_CODE + ) { this.initialize(); if (this._request) { @@ -205,6 +229,19 @@ export default class ChildProcessWorker implements WorkerInterface { this._child.send(request); } + waitForExit() { + return this._exitPromise; + } + + forceExit() { + this._child.kill('SIGTERM'); + const sigkillTimeout = setTimeout( + () => this._child.kill('SIGKILL'), + SIGKILL_DELAY, + ); + this._exitPromise.then(() => clearTimeout(sigkillTimeout)); + } + getWorkerId() { return this._options.workerId; } diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index 071fe3b208d4..f33181f9f874 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -28,19 +28,32 @@ import { export default class ExperimentalWorker implements WorkerInterface { private _worker!: Worker; private _options: WorkerOptions; - private _onProcessEnd!: OnEnd; + private _request: ChildMessage | null; private _retries!: number; - private _stderr: ReturnType | null; - private _stdout: ReturnType | null; + private _onProcessEnd!: OnEnd; + private _fakeStream: PassThrough | null; + private _stdout: ReturnType | null; + private _stderr: ReturnType | null; + + private _exitPromise: Promise; + private _resolveExitPromise!: () => void; + private _forceExited: boolean; constructor(options: WorkerOptions) { this._options = options; + this._request = null; - this._stderr = null; - this._stdout = null; + this._fakeStream = null; + this._stdout = null; + this._stderr = null; + + this._exitPromise = new Promise(resolve => { + this._resolveExitPromise = resolve; + }); + this._forceExited = false; this.initialize(); } @@ -83,8 +96,8 @@ export default class ExperimentalWorker implements WorkerInterface { this._stderr.add(this._worker.stderr); } - this._worker.on('message', this.onMessage.bind(this)); - this._worker.on('exit', this.onExit.bind(this)); + this._worker.on('message', this._onMessage.bind(this)); + this._worker.on('exit', this._onExit.bind(this)); this._worker.postMessage([ CHILD_MESSAGE_INITIALIZE, @@ -101,7 +114,7 @@ export default class ExperimentalWorker implements WorkerInterface { if (this._retries > this._options.maxRetries) { const error = new Error('Call retries were exceeded'); - this.onMessage([ + this._onMessage([ PARENT_MESSAGE_CLIENT_ERROR, error.name, error.message, @@ -117,9 +130,11 @@ export default class ExperimentalWorker implements WorkerInterface { this._fakeStream.end(); this._fakeStream = null; } + + this._resolveExitPromise(); } - onMessage(response: ParentMessage) { + private _onMessage(response: ParentMessage) { let error; switch (response[0]) { @@ -162,8 +177,8 @@ export default class ExperimentalWorker implements WorkerInterface { } } - onExit(exitCode: number) { - if (exitCode !== 0) { + private _onExit(exitCode: number) { + if (exitCode !== 0 && !this._forceExited) { this.initialize(); if (this._request) { @@ -174,6 +189,15 @@ export default class ExperimentalWorker implements WorkerInterface { } } + waitForExit() { + return this._exitPromise; + } + + forceExit() { + this._forceExited = true; + this._worker.terminate(); + } + send(request: ChildMessage, onProcessStart: OnStart, onProcessEnd: OnEnd) { onProcessStart(this); this._onProcessEnd = (...args) => { diff --git a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index 72de719e409d..4c4462bb3b52 100644 --- a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -20,6 +20,8 @@ import { PARENT_MESSAGE_OK, } from '../../types'; +jest.useFakeTimers(); + let Worker; let forkInterface; let childProcess; @@ -32,6 +34,7 @@ beforeEach(() => { childProcess = require('child_process'); childProcess.fork.mockImplementation(() => { forkInterface = Object.assign(new EventEmitter(), { + kill: jest.fn(), send: jest.fn(), stderr: new PassThrough(), stdout: new PassThrough(), @@ -114,10 +117,10 @@ it('stops initializing the worker after the amount of retries is exceeded', () = worker.send(request, onProcessStart, onProcessEnd); // We fail four times (initial + three retries). - forkInterface.emit('exit'); - forkInterface.emit('exit'); - forkInterface.emit('exit'); - forkInterface.emit('exit'); + forkInterface.emit('exit', 1); + forkInterface.emit('exit', 1); + forkInterface.emit('exit', 1); + forkInterface.emit('exit', 1); expect(childProcess.fork).toHaveBeenCalledTimes(5); expect(onProcessStart).toBeCalledWith(worker); @@ -139,7 +142,7 @@ it('provides stdout and stderr from the child processes', async () => { forkInterface.stdout.end('Hello ', {encoding: 'utf8'}); forkInterface.stderr.end('Jest ', {encoding: 'utf8'}); - forkInterface.emit('exit'); + forkInterface.emit('exit', 1); forkInterface.stdout.end('World!', {encoding: 'utf8'}); forkInterface.stderr.end('Workers!', {encoding: 'utf8'}); forkInterface.emit('exit', 0); @@ -179,7 +182,7 @@ it('resends the task to the child process after a retry', () => { expect(forkInterface.send.mock.calls[1][0]).toEqual(request); const previousForkInterface = forkInterface; - forkInterface.emit('exit'); + forkInterface.emit('exit', 1); expect(forkInterface).not.toBe(previousForkInterface); @@ -298,6 +301,18 @@ it('does not restart the child if it cleanly exited', () => { expect(childProcess.fork).toHaveBeenCalledTimes(1); }); +it('resolves waitForExit() after the child process cleanly exited', async () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + expect(childProcess.fork).toHaveBeenCalledTimes(1); + forkInterface.emit('exit', 0); + await worker.waitForExit(); // should not timeout +}); + it('restarts the child when the child process dies', () => { new Worker({ workerPath: '/tmp/foo', @@ -307,3 +322,41 @@ it('restarts the child when the child process dies', () => { forkInterface.emit('exit', 1); expect(childProcess.fork).toHaveBeenCalledTimes(2); }); + +it('sends SIGTERM when forceExit() is called', async () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + worker.forceExit(); + expect(forkInterface.kill.mock.calls).toEqual([['SIGTERM']]); +}); + +it('sends SIGKILL some time after SIGTERM', async () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + worker.forceExit(); + jest.runAllTimers(); + expect(forkInterface.kill.mock.calls).toEqual([['SIGTERM'], ['SIGKILL']]); +}); + +it('does not send SIGKILL if SIGTERM exited the process', async () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + worker.forceExit(); + forkInterface.emit('exit', 143 /* SIGTERM exit code */); + await Promise.resolve(); + + jest.runAllTimers(); + expect(forkInterface.kill.mock.calls).toEqual([['SIGTERM']]); +}); diff --git a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js index c603b7f913cc..281b3baae34b 100644 --- a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js @@ -20,7 +20,7 @@ import { } from '../../types'; let Worker; -let childProcess; +let workerThreads; let originalExecArgv; beforeEach(() => { @@ -31,6 +31,7 @@ beforeEach(() => { const thread = new EventEmitter(); thread.postMessage = jest.fn(); + thread.terminate = jest.fn(); thread.stdout = new PassThrough(); thread.stderr = new PassThrough(); return thread; @@ -42,8 +43,8 @@ beforeEach(() => { }); originalExecArgv = process.execArgv; - childProcess = require('worker_threads').Worker; - childProcess.postMessage = jest.fn(); + workerThreads = require('worker_threads').Worker; + workerThreads.postMessage = jest.fn(); Worker = require('../NodeThreadsWorker').default; }); @@ -54,7 +55,7 @@ afterEach(() => { }); it('passes fork options down to child_process.fork, adding the defaults', () => { - const child = require.resolve('../threadChild'); + const thread = require.resolve('../threadChild'); process.execArgv = ['--inspect', '-p']; @@ -68,8 +69,8 @@ it('passes fork options down to child_process.fork, adding the defaults', () => workerPath: '/tmp/foo/bar/baz.js', }); - expect(childProcess.mock.calls[0][0]).toBe(child.replace(/\.ts$/, '.js')); - expect(childProcess.mock.calls[0][1]).toEqual({ + expect(workerThreads.mock.calls[0][0]).toBe(thread.replace(/\.ts$/, '.js')); + expect(workerThreads.mock.calls[0][1]).toEqual({ eval: false, stderr: true, stdout: true, @@ -83,7 +84,7 @@ it('passes fork options down to child_process.fork, adding the defaults', () => }); }); -it('passes workerId to the child process and assign it to env.JEST_WORKER_ID', () => { +it('passes workerId to the thread and assign it to env.JEST_WORKER_ID', () => { new Worker({ forkOptions: {}, maxRetries: 3, @@ -91,12 +92,12 @@ it('passes workerId to the child process and assign it to env.JEST_WORKER_ID', ( workerPath: '/tmp/foo', }); - expect(childProcess.mock.calls[0][1].workerData.env.JEST_WORKER_ID).toEqual( + expect(workerThreads.mock.calls[0][1].workerData.env.JEST_WORKER_ID).toEqual( '3', ); }); -it('initializes the child process with the given workerPath', () => { +it('initializes the thread with the given workerPath', () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, @@ -131,7 +132,7 @@ it('stops initializing the worker after the amount of retries is exceeded', () = worker._worker.emit('exit'); worker._worker.emit('exit'); - expect(childProcess).toHaveBeenCalledTimes(5); + expect(workerThreads).toHaveBeenCalledTimes(5); expect(onProcessStart).toBeCalledWith(worker); expect(onProcessEnd).toHaveBeenCalledTimes(1); expect(onProcessEnd.mock.calls[0][0]).toBeInstanceOf(Error); @@ -139,7 +140,7 @@ it('stops initializing the worker after the amount of retries is exceeded', () = expect(onProcessEnd.mock.calls[0][1]).toBe(null); }); -it('provides stdout and stderr from the child processes', async () => { +it('provides stdout and stderr from the threads', async () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, @@ -160,7 +161,7 @@ it('provides stdout and stderr from the child processes', async () => { await expect(getStream(stderr)).resolves.toEqual('Jest Workers!'); }); -it('sends the task to the child process', () => { +it('sends the task to the thread', () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, @@ -175,7 +176,7 @@ it('sends the task to the child process', () => { expect(worker._worker.postMessage.mock.calls[1][0]).toEqual(request); }); -it('resends the task to the child process after a retry', () => { +it('resends the task to the thread after a retry', () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, @@ -282,7 +283,7 @@ it('creates error instances for known errors', () => { expect(callback3.mock.calls[0][0]).toBe(412); }); -it('throws when the child process returns a strange message', () => { +it('throws when the thread returns a strange message', () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, @@ -297,24 +298,47 @@ it('throws when the child process returns a strange message', () => { }).toThrow(TypeError); }); -it('does not restart the child if it cleanly exited', () => { +it('does not restart the thread if it cleanly exited', () => { const worker = new Worker({ forkOptions: {}, maxRetries: 3, workerPath: '/tmp/foo', }); - expect(childProcess).toHaveBeenCalledTimes(1); + expect(workerThreads).toHaveBeenCalledTimes(1); worker._worker.emit('exit', 0); - expect(childProcess).toHaveBeenCalledTimes(1); + expect(workerThreads).toHaveBeenCalledTimes(1); }); -it('restarts the child when the child process dies', () => { +it('resolves waitForExit() after the thread cleanly exited', async () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + expect(workerThreads).toHaveBeenCalledTimes(1); + worker._worker.emit('exit', 0); + await worker.waitForExit(); // should not timeout +}); + +it('restarts the thread when the thread dies', () => { const worker = new Worker({ workerPath: '/tmp/foo', }); - expect(childProcess).toHaveBeenCalledTimes(1); + expect(workerThreads).toHaveBeenCalledTimes(1); worker._worker.emit('exit', 1); - expect(childProcess).toHaveBeenCalledTimes(2); + expect(workerThreads).toHaveBeenCalledTimes(2); +}); + +it('terminates the thread when forceExit() is called', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + worker.forceExit(); + expect(worker._worker.terminate).toHaveBeenCalled(); }); diff --git a/packages/jest-worker/src/workers/__tests__/processChild.test.js b/packages/jest-worker/src/workers/__tests__/processChild.test.js index df71b9b20144..c1dd240d525c 100644 --- a/packages/jest-worker/src/workers/__tests__/processChild.test.js +++ b/packages/jest-worker/src/workers/__tests__/processChild.test.js @@ -314,19 +314,17 @@ it('calls the main export if the method call is "default" and it is a Babel tran expect(process.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 67890]); }); -it('finishes the process with exit code 0 if requested', () => { +it('removes the message listener on END message', () => { + // So that there are no more open handles preventing Node from exiting process.emit('message', [ CHILD_MESSAGE_INITIALIZE, true, // Not really used here, but for flow type purity. './my-fancy-worker', ]); - process.emit('message', [ - CHILD_MESSAGE_END, - true, // Not really used here, but for flow type purity. - ]); + process.emit('message', [CHILD_MESSAGE_END]); - expect(process.exit.mock.calls[0]).toEqual([0]); + expect(process.listenerCount('message')).toBe(0); }); it('calls the teardown method ', () => { diff --git a/packages/jest-worker/src/workers/__tests__/threadChild.test.js b/packages/jest-worker/src/workers/__tests__/threadChild.test.js index 5e8431b8df66..d62fedffc084 100644 --- a/packages/jest-worker/src/workers/__tests__/threadChild.test.js +++ b/packages/jest-worker/src/workers/__tests__/threadChild.test.js @@ -114,14 +114,11 @@ beforeEach(() => { thread = require('worker_threads').parentPort; - process.exit = jest.fn(); - // Require the child! require('../threadChild'); }); beforeEach(() => { - process.exit.mockClear(); thread.postMessage.mockClear(); }); @@ -341,7 +338,8 @@ it('calls the main export if the method call is "default" and it is a Babel tran ]); }); -it('finishes the process with exit code 0 if requested', () => { +it('removes the message listener on END message', () => { + // So that there are no more open handles preventing Node from exiting thread.emit('message', [ CHILD_MESSAGE_INITIALIZE, true, // Not really used here, but for flow type purity. @@ -353,7 +351,7 @@ it('finishes the process with exit code 0 if requested', () => { true, // Not really used here, but for flow type purity. ]); - expect(process.exit).toHaveBeenCalledWith(0); + expect(thread.listenerCount('message')).toBe(0); }); it('calls the teardown method ', () => { diff --git a/packages/jest-worker/src/workers/processChild.ts b/packages/jest-worker/src/workers/processChild.ts index fd964fc349d5..96a298cf33e9 100644 --- a/packages/jest-worker/src/workers/processChild.ts +++ b/packages/jest-worker/src/workers/processChild.ts @@ -34,7 +34,7 @@ let initialized = false; * If an invalid message is detected, the child will exit (by throwing) with a * non-zero exit code. */ -process.on('message', (request: any) => { +const messageListener = (request: any) => { switch (request[0]) { case CHILD_MESSAGE_INITIALIZE: const init: ChildMessageInitialize = request; @@ -56,7 +56,8 @@ process.on('message', (request: any) => { 'Unexpected request from parent process: ' + request[0], ); } -}); +}; +process.on('message', messageListener); function reportSuccess(result: any) { if (!process || !process.send) { @@ -105,7 +106,8 @@ function end(): void { } function exitProcess(): void { - process.exit(0); + // Clean up open handles so the process ideally exits gracefully + process.removeListener('message', messageListener); } function execMethod(method: string, args: Array): void { diff --git a/packages/jest-worker/src/workers/threadChild.ts b/packages/jest-worker/src/workers/threadChild.ts index 4e81c394b7d0..bab076dce2c1 100644 --- a/packages/jest-worker/src/workers/threadChild.ts +++ b/packages/jest-worker/src/workers/threadChild.ts @@ -38,7 +38,7 @@ let initialized = false; * If an invalid message is detected, the child will exit (by throwing) with a * non-zero exit code. */ -parentPort!.on('message', (request: any) => { +const messageListener = (request: any) => { switch (request[0]) { case CHILD_MESSAGE_INITIALIZE: const init: ChildMessageInitialize = request; @@ -60,7 +60,8 @@ parentPort!.on('message', (request: any) => { 'Unexpected request from parent process: ' + request[0], ); } -}); +}; +parentPort!.on('message', messageListener); function reportSuccess(result: any) { if (isMainThread) { @@ -109,7 +110,8 @@ function end(): void { } function exitProcess(): void { - process.exit(0); + // Clean up open handles so the worker ideally exits gracefully + parentPort!.removeListener('message', messageListener); } function execMethod(method: string, args: Array): void { diff --git a/yarn.lock b/yarn.lock index 382b862831b2..3253802a3faa 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6227,6 +6227,15 @@ find-up@3.0.0, find-up@^3.0.0: dependencies: locate-path "^3.0.0" +find-process@^1.4.1: + version "1.4.1" + resolved "https://registry.yarnpkg.com/find-process/-/find-process-1.4.1.tgz#628c576a494d1525a27673fb26c77af90db5db02" + integrity sha512-RkYWDeukxEoDKUyocqMGKAYuwhSwq77zL99gCqhX9czWon3otdlzihJ0MSZ6YWNKHyvS/MN2YR4+RGYOuIEANg== + dependencies: + chalk "^2.0.1" + commander "^2.11.0" + debug "^2.6.8" + find-up@^1.0.0: version "1.1.2" resolved "https://registry.yarnpkg.com/find-up/-/find-up-1.1.2.tgz#6b2e9822b1a2ce0a60ab64d610eccad53cb24d0f"