/
TaskRunner.js
117 lines (89 loc) · 2.84 KB
/
TaskRunner.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os from 'os';
import pLimit from 'p-limit';
import Worker from 'jest-worker';
import serialize from 'serialize-javascript';
import minify from './minify';
const workerPath = require.resolve('./worker');
export default class TaskRunner {
constructor(options = {}) {
this.taskGenerator = options.taskGenerator;
this.files = options.files;
this.cache = options.cache;
this.availableNumberOfCores = TaskRunner.getAvailableNumberOfCores(
options.parallel
);
}
static getAvailableNumberOfCores(parallel) {
// In some cases cpus() returns undefined
// https://github.com/nodejs/node/issues/19022
const cpus = os.cpus() || { length: 1 };
return parallel === true
? cpus.length - 1
: Math.min(Number(parallel) || 0, cpus.length - 1);
}
async runTask(task) {
if (this.worker) {
return this.worker.transform(serialize(task));
}
return minify(task);
}
async run() {
const { availableNumberOfCores, cache, files, taskGenerator } = this;
let concurrency = Infinity;
if (availableNumberOfCores > 0) {
// Do not create unnecessary workers when the number of files is less than the available cores, it saves memory
const numWorkers = Math.min(files.length, availableNumberOfCores);
concurrency = numWorkers;
this.worker = new Worker(workerPath, { numWorkers });
// Show syntax error from jest-worker
// https://github.com/facebook/jest/issues/8872#issuecomment-524822081
const workerStderr = this.worker.getStderr();
if (workerStderr) {
workerStderr.pipe(process.stderr);
}
}
const limit = pLimit(concurrency);
const scheduledTasks = [];
for (const file of files) {
const enqueue = async (task) => {
let taskResult;
try {
taskResult = await this.runTask(task);
} catch (error) {
taskResult = { error };
}
if (cache.isEnabled() && !taskResult.error) {
taskResult = await cache.store(task, taskResult).then(
() => taskResult,
() => taskResult
);
}
task.callback(taskResult);
return taskResult;
};
scheduledTasks.push(
limit(() => {
const task = taskGenerator(file).next().value;
if (!task) {
// Something went wrong, for example the `cacheKeys` option throw an error
return Promise.resolve();
}
if (cache.isEnabled()) {
return cache.get(task).then(
(taskResult) => task.callback(taskResult),
() => enqueue(task)
);
}
return enqueue(task);
})
);
}
return Promise.all(scheduledTasks);
}
async exit() {
if (!this.worker) {
return Promise.resolve();
}
return this.worker.end();
}
}