Skip to content

Commit

Permalink
feat(run): Add just-in-time queue management (#2045)
Browse files Browse the repository at this point in the history
* Implement dynamic graph traversal
* Centralize cycle warnings and don't prune centrally
  • Loading branch information
bweggersen authored and evocateur committed May 11, 2019
1 parent 5d84b61 commit 6eca172
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 62 deletions.
48 changes: 35 additions & 13 deletions commands/run/index.js
@@ -1,13 +1,13 @@
"use strict";

const pMap = require("p-map");
const PQueue = require("p-queue");

const Command = require("@lerna/command");
const npmRunScript = require("@lerna/npm-run-script");
const batchPackages = require("@lerna/batch-packages");
const runParallelBatches = require("@lerna/run-parallel-batches");
const output = require("@lerna/output");
const timer = require("@lerna/timer");
const QueryGraph = require("@lerna/query-graph");
const ValidationError = require("@lerna/validation-error");
const { getFilteredPackages } = require("@lerna/filter-options");

Expand Down Expand Up @@ -58,10 +58,6 @@ class RunCommand extends Command {
// still exits zero, aka "ok"
return false;
}

this.batchedPackages = this.toposort
? batchPackages(this.packagesWithScript, this.options.rejectCycles)
: [this.packagesWithScript];
});
}

Expand All @@ -77,10 +73,10 @@ class RunCommand extends Command {
let chain = Promise.resolve();
const getElapsed = timer();

if (this.options.parallel) {
if (this.options.parallel || !this.toposort) {
chain = chain.then(() => this.runScriptInPackagesParallel());
} else {
chain = chain.then(() => this.runScriptInPackagesBatched());
chain = chain.then(() => this.runScriptInPackagesTopological());
}

if (this.bail) {
Expand Down Expand Up @@ -130,18 +126,44 @@ class RunCommand extends Command {
};
}

runScriptInPackagesBatched() {
runScriptInPackagesTopological() {
const queue = new PQueue({ concurrency: this.concurrency });
const graph = new QueryGraph(this.packagesWithScript, this.options.rejectCycles);

const runner = this.options.stream
? pkg => this.runScriptInPackageStreaming(pkg)
: pkg => this.runScriptInPackageCapturing(pkg);

return runParallelBatches(this.batchedPackages, this.concurrency, runner).then(batchedResults =>
batchedResults.reduce((arr, batch) => arr.concat(batch), [])
);
return new Promise((resolve, reject) => {
const returnValues = [];

const queueNextAvailablePackages = () =>
graph.getAvailablePackages().forEach(({ pkg, name }) => {
graph.markAsTaken(name);

queue
.add(() =>
runner(pkg)
.then(value => returnValues.push(value))
.then(() => graph.markAsDone(pkg))
.then(() => queueNextAvailablePackages())
)
.catch(reject);
});

queueNextAvailablePackages();

return queue.onIdle().then(() => resolve(returnValues));
});
}

runScriptInPackagesParallel() {
return pMap(this.packagesWithScript, pkg => this.runScriptInPackageStreaming(pkg));
const runner =
this.options.parallel || this.options.stream
? pkg => this.runScriptInPackageStreaming(pkg)
: pkg => this.runScriptInPackageCapturing(pkg);

return pMap(this.packagesWithScript, runner);
}

runScriptInPackageStreaming(pkg) {
Expand Down
6 changes: 3 additions & 3 deletions commands/run/package.json
Expand Up @@ -35,14 +35,14 @@
"populate--": true
},
"dependencies": {
"@lerna/batch-packages": "file:../../utils/batch-packages",
"@lerna/command": "file:../../core/command",
"@lerna/filter-options": "file:../../core/filter-options",
"@lerna/npm-run-script": "file:../../utils/npm-run-script",
"@lerna/output": "file:../../utils/output",
"@lerna/run-parallel-batches": "file:../../utils/run-parallel-batches",
"@lerna/query-graph": "file:../../utils/query-graph",
"@lerna/timer": "file:../../utils/timer",
"@lerna/validation-error": "file:../../core/validation-error",
"p-map": "^1.2.0"
"p-map": "^1.2.0",
"p-queue": "^4.0.0"
}
}
4 changes: 4 additions & 0 deletions core/package-graph/__tests__/package-graph.test.js
Expand Up @@ -286,7 +286,9 @@ describe("PackageGraph", () => {
expect(paths.size).toBe(0);
expect(nodes.size).toBe(0);
});
});

describe(".pruneCycleNodes()", () => {
it("prunes direct cycles from the graph", () => {
const pkgs = [
new Package(
Expand All @@ -313,6 +315,7 @@ describe("PackageGraph", () => {
const graph = new PackageGraph(pkgs);
// deepInspect(graph);
const [paths, nodes] = graph.partitionCycles();
graph.pruneCycleNodes(nodes);
// deepInspect(nodes);

expect(graph.size).toBe(0);
Expand All @@ -338,6 +341,7 @@ Set {
const graph = new PackageGraph(pkgs);
// deepInspect(graph);
const [paths, nodes] = graph.partitionCycles();
graph.pruneCycleNodes(nodes);
// deepInspect(graph);
// deepInspect(nodes);

Expand Down
25 changes: 22 additions & 3 deletions core/package-graph/index.js
Expand Up @@ -2,6 +2,8 @@

const npa = require("npm-package-arg");
const semver = require("semver");
const log = require("npmlog");

const ValidationError = require("@lerna/validation-error");
const prereleaseIdFromVersion = require("@lerna/prerelease-id-from-version");

Expand Down Expand Up @@ -189,9 +191,10 @@ class PackageGraph extends Map {

/**
* Return a tuple of cycle paths and nodes, which have been removed from the graph.
* @param {!boolean} rejectCycles Whether or not to reject cycles
* @returns [Set<String[]>, Set<PackageGraphNode>]
*/
partitionCycles() {
partitionCycles(rejectCycles) {
const cyclePaths = new Set();
const cycleNodes = new Set();

Expand Down Expand Up @@ -235,13 +238,29 @@ class PackageGraph extends Map {
currentNode.localDependents.forEach(visits([currentName]));
});

if (cycleNodes.size) {
this.prune(...cycleNodes);
if (cyclePaths.size) {
const cycleMessage = ["Dependency cycles detected, you should fix these!"]
.concat(Array.from(cyclePaths).map(cycle => cycle.join(" -> ")))
.join("\n");

if (rejectCycles) {
throw new ValidationError("ECYCLE", cycleMessage);
}

log.warn("ECYCLE", cycleMessage);
}

return [cyclePaths, cycleNodes];
}

/**
* Remove cycle nodes.
* @param {Set<PackageGraphNode>} cycleNodes
*/
pruneCycleNodes(cycleNodes) {
return this.prune(...cycleNodes);
}

/**
* Remove all candidate nodes.
* @param {PackageGraphNode[]} candidates
Expand Down
1 change: 1 addition & 0 deletions core/package-graph/package.json
Expand Up @@ -34,6 +34,7 @@
"@lerna/prerelease-id-from-version": "file:../../utils/prerelease-id-from-version",
"@lerna/validation-error": "file:../validation-error",
"npm-package-arg": "^6.1.0",
"npmlog": "^4.1.2",
"semver": "^5.5.0"
}
}

0 comments on commit 6eca172

Please sign in to comment.