/
tasks-runner-v2.ts
102 lines (89 loc) · 2.83 KB
/
tasks-runner-v2.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import { Observable } from 'rxjs';
import {
AffectedEventType,
Task,
TaskCompleteEvent,
TasksRunner
} from './tasks-runner';
import { ProjectGraph } from '../core/project-graph';
import { NxJson } from '../core/shared-interfaces';
import { TaskOrderer } from './task-orderer';
import { TaskOrchestrator } from './task-orchestrator';
export interface RemoteCache {
retrieve: (hash: string, cacheDirectory: string) => Promise<boolean>;
store: (hash: string, cacheDirectory: string) => Promise<boolean>;
}
export interface LifeCycle {
startTask(task: Task): void;
endTask(task: Task, code: number): void;
}
class NoopLifeCycle implements LifeCycle {
startTask(task: Task): void {}
endTask(task: Task, code: number): void {}
}
export interface DefaultTasksRunnerOptions {
parallel?: boolean;
maxParallel?: number;
cacheableOperations?: string[];
cacheDirectory?: string;
remoteCache?: RemoteCache;
lifeCycle?: LifeCycle;
captureStderr?: boolean;
}
export const tasksRunnerV2: TasksRunner<DefaultTasksRunnerOptions> = (
tasks: Task[],
options: DefaultTasksRunnerOptions,
context: { target: string; projectGraph: ProjectGraph; nxJson: NxJson }
): Observable<TaskCompleteEvent> => {
if (!options.lifeCycle) {
options.lifeCycle = new NoopLifeCycle();
}
return new Observable(subscriber => {
runAllTasks(tasks, options, context)
.then(data => data.forEach(d => subscriber.next(d)))
.catch(e => {
console.error('Unexpected error:');
console.error(e);
process.exit(1);
})
.finally(() => {
subscriber.complete();
// fix for https://github.com/nrwl/nx/issues/1666
if (process.stdin['unref']) (process.stdin as any).unref();
});
});
};
async function runAllTasks(
tasks: Task[],
options: DefaultTasksRunnerOptions,
context: { target: string; projectGraph: ProjectGraph; nxJson: NxJson }
): Promise<Array<{ task: Task; type: any; success: boolean }>> {
const stages = new TaskOrderer(
context.target,
context.projectGraph
).splitTasksIntoStages(tasks);
const orchestrator = new TaskOrchestrator(context.projectGraph, options);
const res = [];
for (let i = 0; i < stages.length; ++i) {
const tasksInStage = stages[i];
const statuses = await orchestrator.run(tasksInStage);
res.push(...statuses);
// any task failed, we need to skip further stages
if (statuses.find(s => !s.success)) {
res.push(...markStagesAsNotSuccessful(stages.splice(i + 1)));
return res;
}
}
return res;
}
function markStagesAsNotSuccessful(stages: Task[][]) {
return stages.reduce((m, c) => [...m, ...tasksToStatuses(c, false)], []);
}
function tasksToStatuses(tasks: Task[], success: boolean) {
return tasks.map(task => ({
task,
type: AffectedEventType.TaskComplete,
success
}));
}
export default tasksRunnerV2;