forked from jestjs/jest
/
BaseWorkerPool.ts
99 lines (79 loc) 路 2.3 KB
/
BaseWorkerPool.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/**
* Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
import path from 'path';
import mergeStream from 'merge-stream';
import {
CHILD_MESSAGE_END,
WorkerPoolOptions,
WorkerOptions,
WorkerInterface,
} from '../types';
/* istanbul ignore next */
const emptyMethod = () => {};
export default class BaseWorkerPool {
_stderr: NodeJS.ReadableStream;
_stdout: NodeJS.ReadableStream;
_options: WorkerPoolOptions;
_workers: Array<WorkerInterface>;
constructor(workerPath: string, options: WorkerPoolOptions) {
this._options = options;
this._workers = new Array(options.numWorkers);
if (!path.isAbsolute(workerPath)) {
workerPath = require.resolve(workerPath);
}
const stdout = mergeStream();
const stderr = mergeStream();
const {forkOptions, maxRetries, setupArgs} = options;
for (let i = 0; i < options.numWorkers; i++) {
const workerOptions: WorkerOptions = {
forkOptions,
maxRetries,
setupArgs,
workerId: i,
workerPath,
};
const worker = this.createWorker(workerOptions);
const workerStdout = worker.getStdout();
const workerStderr = worker.getStderr();
if (workerStdout) {
stdout.add(workerStdout);
}
if (workerStderr) {
stderr.add(workerStderr);
}
this._workers[i] = worker;
}
this._stdout = stdout;
this._stderr = stderr;
}
getStderr(): NodeJS.ReadableStream {
return this._stderr;
}
getStdout(): NodeJS.ReadableStream {
return this._stdout;
}
getWorkers(): Array<WorkerInterface> {
return this._workers;
}
getWorkerById(workerId: number): WorkerInterface {
return this._workers[workerId];
}
createWorker(_workerOptions: WorkerOptions): WorkerInterface {
throw Error('Missing method createWorker in WorkerPool');
}
end(): void {
// We do not cache the request object here. If so, it would only be only
// processed by one of the workers, and we want them all to close.
for (let i = 0; i < this._workers.length; i++) {
this._workers[i].send(
[CHILD_MESSAGE_END, false],
emptyMethod,
emptyMethod,
);
}
}
}