Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: worker being killed after being spawned and other worker bugs #13107

Merged
merged 53 commits into from Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
71d0080
test: ad integration test case for worker not executing test
phawxby Aug 8, 2022
8c6ab73
test: add test case for worker being killed just once
phawxby Aug 9, 2022
86acdf1
fix: worker being killed after spawning
phawxby Aug 9, 2022
718425b
Merge branch 'master' into worker-restart-bug
phawxby Aug 9, 2022
de0c0e8
fix: missing license
phawxby Aug 9, 2022
d83aa84
fix: flaky tests
phawxby Aug 9, 2022
9435d0b
chore: add some diagnostic logging
phawxby Aug 9, 2022
29e1193
chore: break apart tests to see where the problem is occuring
phawxby Aug 9, 2022
c23f581
fix: handling of work ready status
phawxby Aug 9, 2022
e8a8058
chore: add fallback timeout
phawxby Aug 9, 2022
4e0b399
refactor: add abstract class with common functions and vars
phawxby Aug 9, 2022
790f772
fix: restarting on init
phawxby Aug 9, 2022
b4151ea
refactor: event emitting is a much cleaner way of tracking what's goi…
phawxby Aug 9, 2022
2753069
chore: better handle shutdown events
phawxby Aug 9, 2022
617e102
fix: only set out of memory when necessary
phawxby Aug 9, 2022
28230da
chore: windows debugging
phawxby Aug 9, 2022
60b5b1b
chore: try and debug single test
phawxby Aug 9, 2022
a9a3779
chore: remove debugging
phawxby Aug 9, 2022
cdf9627
chore: remove logging
phawxby Aug 9, 2022
7a79c92
chore: simplify tests and move more to abstract worker
phawxby Aug 9, 2022
11fd6f8
chore: only run flaky test
phawxby Aug 10, 2022
55f4097
chore: enable tests again
phawxby Aug 10, 2022
723951b
fix: missing snapshot
phawxby Aug 10, 2022
a062850
chore: try to resolve the flake
phawxby Aug 10, 2022
66c83bd
chore: logging
phawxby Aug 10, 2022
c47404d
chore: see if this changes debugging output
phawxby Aug 10, 2022
9331bd2
chore: make noisy
phawxby Aug 10, 2022
5058e14
chore: debugging
phawxby Aug 10, 2022
ddbfb5e
chore: more
phawxby Aug 10, 2022
811e228
chore: even more robust logging
phawxby Aug 10, 2022
197c0da
chore: refactor error streaming monitoring
phawxby Aug 10, 2022
0d0d03e
chore: error handling
phawxby Aug 10, 2022
11e8b25
chore: log this
phawxby Aug 10, 2022
8f427a4
fix: tests and try to detect process crashes differently on windows
phawxby Aug 10, 2022
57d3b20
chore: remove debug logging
phawxby Aug 10, 2022
d3abe14
chore: remove only
phawxby Aug 10, 2022
f704620
chore: add win32 specific logic to try and work around the issue
phawxby Aug 10, 2022
5a840bb
chore: back to adding debug logging
phawxby Aug 10, 2022
d6b2db0
chore: try adding more possible detections
phawxby Aug 10, 2022
202323b
chore: try a different event
phawxby Aug 10, 2022
9e7d081
chore: disconnect handling
phawxby Aug 10, 2022
75037d2
fix: test bug
phawxby Aug 10, 2022
37c5a43
chore: log child and buffer
phawxby Aug 10, 2022
3e8b3ae
chore: forgot to make public
phawxby Aug 10, 2022
fab410a
chore: log
phawxby Aug 10, 2022
8e4a31e
chore: use up memory quicker and more logging
phawxby Aug 10, 2022
b974eea
chore: try removing force exit to see if that helps with the confusin…
phawxby Aug 10, 2022
1e1023d
chore: different strat
phawxby Aug 10, 2022
ab2931d
chore: remove debugging
phawxby Aug 10, 2022
c610bdb
chore: revert to silent worker
phawxby Aug 11, 2022
ee20013
Update CHANGELOG.md
SimenB Aug 11, 2022
bd786b1
chore: pr feedback
phawxby Aug 11, 2022
613fcf2
chore: pr changes
phawxby Aug 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions e2e/worker-restarting/__tests__/test1.js
@@ -0,0 +1,10 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

test('basic test', () => {
expect(true).toBeTruthy();
});
10 changes: 10 additions & 0 deletions e2e/worker-restarting/__tests__/test2.js
@@ -0,0 +1,10 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

test('basic test', () => {
expect(true).toBeTruthy();
});
10 changes: 10 additions & 0 deletions e2e/worker-restarting/__tests__/test3.js
@@ -0,0 +1,10 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

test('basic test', () => {
expect(true).toBeTruthy();
});
6 changes: 6 additions & 0 deletions e2e/worker-restarting/package.json
@@ -0,0 +1,6 @@
{
SimenB marked this conversation as resolved.
Show resolved Hide resolved
"jest": {
"maxWorkers": 2,
"workerIdleMemoryLimit": "1MB"
}
}
32 changes: 32 additions & 0 deletions packages/jest-worker/src/types.ts
Expand Up @@ -66,12 +66,15 @@ export interface WorkerPoolInterface {
}

export interface WorkerInterface {
get state(): WorkerStates;

send(
request: ChildMessage,
onProcessStart: OnStart,
onProcessEnd: OnEnd,
onCustomMessage: OnCustomMessage,
): void;

waitForExit(): Promise<void>;
forceExit(): void;

Expand All @@ -83,6 +86,18 @@ export interface WorkerInterface {
*/
getWorkerSystemId(): number;
getMemoryUsage(): Promise<number | null>;
/**
* Checks to see if the child worker is actually running.
*/
isWorkerRunning(): boolean;
/**
* When the worker child is started and ready to start handling requests.
*
* @remarks
* This mostly exists to help with testing so that you don't check the status
* of things like isWorkerRunning before it actually is.
*/
waitForWorkerReady(): Promise<void>;
}

export type PoolExitResult = {
Expand Down Expand Up @@ -170,8 +185,21 @@ export type WorkerOptions = {
* the raw output of the worker.
*/
silent?: boolean;
/**
* Used to immediately bind event handlers.
*/
on?: {
[WorkerEvents.STATE_CHANGE]:
| onStateChangeHandler
| Array<onStateChangeHandler>;
SimenB marked this conversation as resolved.
Show resolved Hide resolved
};
};

export type onStateChangeHandler = (
phawxby marked this conversation as resolved.
Show resolved Hide resolved
state: WorkerStates,
oldState: WorkerStates,
) => void;

// Messages passed from the parent to the children.

export type MessagePort = typeof EventEmitter & {
Expand Down Expand Up @@ -265,3 +293,7 @@ export enum WorkerStates {
SHUTTING_DOWN = 'shutting-down',
SHUT_DOWN = 'shut-down',
}

export enum WorkerEvents {
STATE_CHANGE = 'state-change',
}
109 changes: 52 additions & 57 deletions packages/jest-worker/src/workers/ChildProcessWorker.ts
Expand Up @@ -7,7 +7,6 @@

import {ChildProcess, ForkOptions, fork} from 'child_process';
import {totalmem} from 'os';
import {PassThrough} from 'stream';
import mergeStream = require('merge-stream');
import {stdout as stdoutSupportsColor} from 'supports-color';
import {
Expand All @@ -27,13 +26,14 @@ import {
WorkerOptions,
WorkerStates,
} from '../types';
import WorkerAbstract from './WorkerAbstract';

const SIGNAL_BASE_EXIT_CODE = 128;
const SIGKILL_EXIT_CODE = SIGNAL_BASE_EXIT_CODE + 9;
const SIGTERM_EXIT_CODE = SIGNAL_BASE_EXIT_CODE + 15;

// How long to wait after SIGTERM before sending SIGKILL
const SIGKILL_DELAY = 500;
export const SIGKILL_DELAY = 500;

/**
* This class wraps the child process and provides a nice interface to
Expand All @@ -53,7 +53,10 @@ const SIGKILL_DELAY = 500;
* field is changed to "true", so that other workers which might encounter the
* same call skip it.
*/
export default class ChildProcessWorker implements WorkerInterface {
export default class ChildProcessWorker
extends WorkerAbstract
implements WorkerInterface
{
private _child!: ChildProcess;
private _options: WorkerOptions;

Expand All @@ -62,50 +65,42 @@ export default class ChildProcessWorker implements WorkerInterface {
private _onProcessEnd!: OnEnd;
private _onCustomMessage!: OnCustomMessage;

private _fakeStream: PassThrough | null;
private _stdout: ReturnType<typeof mergeStream> | null;
private _stderr: ReturnType<typeof mergeStream> | null;

private _exitPromise: Promise<void>;
private _resolveExitPromise!: () => void;

private _memoryUsagePromise: Promise<number> | undefined;
private _resolveMemoryUsage: ((arg0: number) => void) | undefined;

private _childIdleMemoryUsage: number | null;
private _childIdleMemoryUsageLimit: number | null;
private _memoryUsageCheck = false;
private _state: WorkerStates;

private _childWorkerPath: string;

constructor(options: WorkerOptions) {
super(options);

this._options = options;

this._request = null;

this._fakeStream = null;
this._stdout = null;
this._stderr = null;
this._childIdleMemoryUsage = null;
this._childIdleMemoryUsageLimit = options.idleMemoryLimit || null;

this._exitPromise = new Promise(resolve => {
this._resolveExitPromise = resolve;
});

this._childWorkerPath =
options.childWorkerPath || require.resolve('./processChild');

this._state = WorkerStates.STARTING;
this.state = WorkerStates.STARTING;
this.initialize();
}

initialize(): void {
if (
this._state === WorkerStates.OUT_OF_MEMORY ||
this._state === WorkerStates.SHUTTING_DOWN ||
this._state === WorkerStates.SHUT_DOWN
this.state === WorkerStates.OUT_OF_MEMORY ||
this.state === WorkerStates.SHUTTING_DOWN ||
this.state === WorkerStates.SHUT_DOWN
) {
return;
}
Expand All @@ -114,7 +109,7 @@ export default class ChildProcessWorker implements WorkerInterface {
this._child.kill('SIGKILL');
}

this._state = WorkerStates.STARTING;
this.state = WorkerStates.STARTING;

const forceColor = stdoutSupportsColor ? {FORCE_COLOR: '1'} : {};
const options: ForkOptions = {
Expand All @@ -132,42 +127,39 @@ export default class ChildProcessWorker implements WorkerInterface {
...this._options.forkOptions,
};

const child = fork(this._childWorkerPath, [], options);
this._child = fork(this._childWorkerPath, [], options);

if (child.stdout) {
if (this._child.stdout) {
if (!this._stdout) {
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stdout = mergeStream(this._getFakeStream());
}

this._stdout.add(child.stdout);
this._stdout.add(this._child.stdout);
}

if (child.stderr) {
if (this._child.stderr) {
if (!this._stderr) {
// We need to add a permanent stream to the merged stream to prevent it
// from ending when the subprocess stream ends
this._stderr = mergeStream(this._getFakeStream());
}

this._stderr.add(child.stderr);
this._stderr.add(this._child.stderr);
}

this._detectOutOfMemoryCrash(child);
child.on('message', this._onMessage.bind(this));
child.on('exit', this._onExit.bind(this));
this._detectOutOfMemoryCrash(this._child);
this._child.on('message', this._onMessage.bind(this));
this._child.on('exit', this._onExit.bind(this));

child.send([
this._child.send([
CHILD_MESSAGE_INITIALIZE,
false,
this._options.workerPath,
this._options.setupArgs,
]);

this._child = child;
this._state = WorkerStates.OK;

this._retries++;

// If we exceeded the amount of retries, we will emulate an error reply
Expand All @@ -189,13 +181,18 @@ export default class ChildProcessWorker implements WorkerInterface {
// Clear the request so we don't keep executing it.
this._request = null;
}

this.state = WorkerStates.OK;
if (this._resolveWorkerReady) {
this._resolveWorkerReady();
}
}

private _detectOutOfMemoryCrash(child: ChildProcess): void {
let stderrStr = '';

const handler = (chunk: any) => {
if (this._state !== WorkerStates.OUT_OF_MEMORY) {
if (this.state === WorkerStates.OK) {
let str: string | undefined = undefined;

if (chunk instanceof Buffer) {
Expand All @@ -209,26 +206,14 @@ export default class ChildProcessWorker implements WorkerInterface {
}

if (stderrStr.includes('heap out of memory')) {
this._state = WorkerStates.OUT_OF_MEMORY;
this.state = WorkerStates.OUT_OF_MEMORY;
}
}
};

child.stderr?.on('data', handler);
}

private _shutdown() {
this._state = WorkerStates.SHUTTING_DOWN;

// End the temporary streams so the merged streams end too
if (this._fakeStream) {
this._fakeStream.end();
this._fakeStream = null;
}

this._resolveExitPromise();
}

private _onMessage(response: ParentMessage) {
// TODO: Add appropriate type check
let error: any;
Expand Down Expand Up @@ -311,15 +296,18 @@ export default class ChildProcessWorker implements WorkerInterface {
this._childIdleMemoryUsage &&
this._childIdleMemoryUsage > limit
) {
this._state = WorkerStates.RESTARTING;
this.state = WorkerStates.RESTARTING;

this.killChild();
}
}
}

private _onExit(exitCode: number | null) {
if (exitCode !== 0 && this._state === WorkerStates.OUT_OF_MEMORY) {
this._workerReadyPromise = undefined;
this._resolveWorkerReady = undefined;

if (exitCode !== 0 && this.state === WorkerStates.OUT_OF_MEMORY) {
this._onProcessEnd(
new Error('Jest worker ran out of memory and crashed'),
null,
Expand All @@ -331,8 +319,8 @@ export default class ChildProcessWorker implements WorkerInterface {
exitCode !== null &&
exitCode !== SIGTERM_EXIT_CODE &&
exitCode !== SIGKILL_EXIT_CODE &&
this._state !== WorkerStates.SHUTTING_DOWN) ||
this._state === WorkerStates.RESTARTING
this.state !== WorkerStates.SHUTTING_DOWN) ||
this.state === WorkerStates.RESTARTING
) {
this.initialize();

Expand All @@ -353,11 +341,17 @@ export default class ChildProcessWorker implements WorkerInterface {
onProcessStart(this);

this._onProcessEnd = (...args) => {
const hasRequest = !!this._request;

// Clean the request to avoid sending past requests to workers that fail
// while waiting for a new request (timers, unhandled rejections...)
this._request = null;

if (this._childIdleMemoryUsageLimit && this._child.connected) {
if (
this._childIdleMemoryUsageLimit &&
this._child.connected &&
hasRequest
) {
this.checkMemoryUsage();
}

Expand All @@ -377,12 +371,16 @@ export default class ChildProcessWorker implements WorkerInterface {
}

killChild(): NodeJS.Timeout {
this._child.kill('SIGTERM');
return setTimeout(() => this._child.kill('SIGKILL'), SIGKILL_DELAY);
// We store a reference so that there's no way we can accidentally
// kill a new worker that has been spawned.
const childToKill = this._child;

childToKill.kill('SIGTERM');
return setTimeout(() => childToKill.kill('SIGKILL'), SIGKILL_DELAY);
}

forceExit(): void {
this._state = WorkerStates.SHUTTING_DOWN;
this.state = WorkerStates.SHUTTING_DOWN;

const sigkillTimeout = this.killChild();
this._exitPromise.then(() => clearTimeout(sigkillTimeout));
Expand Down Expand Up @@ -466,10 +464,7 @@ export default class ChildProcessWorker implements WorkerInterface {
}
}

private _getFakeStream() {
if (!this._fakeStream) {
this._fakeStream = new PassThrough();
}
return this._fakeStream;
isWorkerRunning(): boolean {
return this._child.connected && !this._child.killed;
}
}