Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@lerna/run with just-in-time queue management #2045

Merged
merged 10 commits into from May 11, 2019
48 changes: 35 additions & 13 deletions commands/run/index.js
@@ -1,13 +1,13 @@
"use strict";

const pMap = require("p-map");
const PQueue = require("p-queue");

const Command = require("@lerna/command");
const npmRunScript = require("@lerna/npm-run-script");
const batchPackages = require("@lerna/batch-packages");
const runParallelBatches = require("@lerna/run-parallel-batches");
const output = require("@lerna/output");
const timer = require("@lerna/timer");
const QueryGraph = require("@lerna/query-graph");
const ValidationError = require("@lerna/validation-error");
const { getFilteredPackages } = require("@lerna/filter-options");

Expand Down Expand Up @@ -58,10 +58,6 @@ class RunCommand extends Command {
// still exits zero, aka "ok"
return false;
}

this.batchedPackages = this.toposort
evocateur marked this conversation as resolved.
Show resolved Hide resolved
? batchPackages(this.packagesWithScript, this.options.rejectCycles)
: [this.packagesWithScript];
});
}

Expand All @@ -77,10 +73,10 @@ class RunCommand extends Command {
let chain = Promise.resolve();
const getElapsed = timer();

if (this.options.parallel) {
if (this.options.parallel || !this.toposort) {
chain = chain.then(() => this.runScriptInPackagesParallel());
} else {
chain = chain.then(() => this.runScriptInPackagesBatched());
chain = chain.then(() => this.runScriptInPackagesTopological());
}

if (this.bail) {
Expand Down Expand Up @@ -130,18 +126,44 @@ class RunCommand extends Command {
};
}

runScriptInPackagesBatched() {
runScriptInPackagesTopological() {
const queue = new PQueue({ concurrency: this.concurrency });
const graph = new QueryGraph(this.packagesWithScript, this.options.rejectCycles);

const runner = this.options.stream
? pkg => this.runScriptInPackageStreaming(pkg)
: pkg => this.runScriptInPackageCapturing(pkg);

return runParallelBatches(this.batchedPackages, this.concurrency, runner).then(batchedResults =>
batchedResults.reduce((arr, batch) => arr.concat(batch), [])
);
return new Promise((resolve, reject) => {
const returnValues = [];

const queueNextAvailablePackages = () =>
graph.getAvailablePackages().forEach(({ pkg, name }) => {
graph.markAsTaken(name);

queue
.add(() =>
runner(pkg)
.then(value => returnValues.push(value))
.then(() => graph.markAsDone(pkg))
.then(() => queueNextAvailablePackages())
)
.catch(reject);
});

queueNextAvailablePackages();

return queue.onIdle().then(() => resolve(returnValues));
});
}

runScriptInPackagesParallel() {
return pMap(this.packagesWithScript, pkg => this.runScriptInPackageStreaming(pkg));
const runner =
this.options.parallel || this.options.stream
? pkg => this.runScriptInPackageStreaming(pkg)
: pkg => this.runScriptInPackageCapturing(pkg);

return pMap(this.packagesWithScript, runner);
}

runScriptInPackageStreaming(pkg) {
Expand Down
6 changes: 3 additions & 3 deletions commands/run/package.json
Expand Up @@ -35,14 +35,14 @@
"populate--": true
},
"dependencies": {
"@lerna/batch-packages": "file:../../utils/batch-packages",
"@lerna/command": "file:../../core/command",
"@lerna/filter-options": "file:../../core/filter-options",
"@lerna/npm-run-script": "file:../../utils/npm-run-script",
"@lerna/output": "file:../../utils/output",
"@lerna/run-parallel-batches": "file:../../utils/run-parallel-batches",
"@lerna/query-graph": "file:../../utils/query-graph",
"@lerna/timer": "file:../../utils/timer",
"@lerna/validation-error": "file:../../core/validation-error",
"p-map": "^1.2.0"
"p-map": "^1.2.0",
"p-queue": "^4.0.0"
}
}
4 changes: 4 additions & 0 deletions core/package-graph/__tests__/package-graph.test.js
Expand Up @@ -286,7 +286,9 @@ describe("PackageGraph", () => {
expect(paths.size).toBe(0);
expect(nodes.size).toBe(0);
});
});

describe(".pruneCycleNodes()", () => {
it("prunes direct cycles from the graph", () => {
const pkgs = [
new Package(
Expand All @@ -313,6 +315,7 @@ describe("PackageGraph", () => {
const graph = new PackageGraph(pkgs);
// deepInspect(graph);
const [paths, nodes] = graph.partitionCycles();
graph.pruneCycleNodes(nodes);
// deepInspect(nodes);

expect(graph.size).toBe(0);
Expand All @@ -338,6 +341,7 @@ Set {
const graph = new PackageGraph(pkgs);
// deepInspect(graph);
const [paths, nodes] = graph.partitionCycles();
graph.pruneCycleNodes(nodes);
// deepInspect(graph);
// deepInspect(nodes);

Expand Down
25 changes: 22 additions & 3 deletions core/package-graph/index.js
Expand Up @@ -2,6 +2,8 @@

const npa = require("npm-package-arg");
const semver = require("semver");
const log = require("npmlog");

const ValidationError = require("@lerna/validation-error");
const prereleaseIdFromVersion = require("@lerna/prerelease-id-from-version");

Expand Down Expand Up @@ -189,9 +191,10 @@ class PackageGraph extends Map {

/**
* Return a tuple of cycle paths and nodes, which have been removed from the graph.
* @param {!boolean} rejectCycles Whether or not to reject cycles
* @returns [Set<String[]>, Set<PackageGraphNode>]
*/
partitionCycles() {
partitionCycles(rejectCycles) {
const cyclePaths = new Set();
const cycleNodes = new Set();

Expand Down Expand Up @@ -235,13 +238,29 @@ class PackageGraph extends Map {
currentNode.localDependents.forEach(visits([currentName]));
});

if (cycleNodes.size) {
this.prune(...cycleNodes);
if (cyclePaths.size) {
const cycleMessage = ["Dependency cycles detected, you should fix these!"]
.concat(Array.from(cyclePaths).map(cycle => cycle.join(" -> ")))
.join("\n");

if (rejectCycles) {
throw new ValidationError("ECYCLE", cycleMessage);
}

log.warn("ECYCLE", cycleMessage);
}

return [cyclePaths, cycleNodes];
}

/**
* Remove cycle nodes.
* @param {Set<PackageGraphNode>} cycleNodes
*/
pruneCycleNodes(cycleNodes) {
return this.prune(...cycleNodes);
}

/**
* Remove all candidate nodes.
* @param {PackageGraphNode[]} candidates
Expand Down
1 change: 1 addition & 0 deletions core/package-graph/package.json
Expand Up @@ -34,6 +34,7 @@
"@lerna/prerelease-id-from-version": "file:../../utils/prerelease-id-from-version",
"@lerna/validation-error": "file:../validation-error",
"npm-package-arg": "^6.1.0",
"npmlog": "^4.1.2",
"semver": "^5.5.0"
}
}