Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel mode: enable custom worker reporters and object references #4409

Merged
merged 3 commits into from Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .wallaby.js
Expand Up @@ -17,7 +17,8 @@ module.exports = () => {
},
'package.json',
'test/opts/mocha.opts',
'mocharc.yml'
'mocharc.yml',
'!lib/browser/growl.js'
],
filesWithNoCoverageCalculated: [
'test/**/*.fixture.js',
Expand Down
23 changes: 14 additions & 9 deletions lib/hook.js
@@ -1,7 +1,8 @@
'use strict';

var Runnable = require('./runnable');
var inherits = require('./utils').inherits;
const {inherits, constants} = require('./utils');
const {MOCHA_ID_PROP_NAME} = constants;

/**
* Expose `Hook`.
Expand Down Expand Up @@ -63,16 +64,20 @@ Hook.prototype.serialize = function serialize() {
return {
$$isPending: this.isPending(),
$$titlePath: this.titlePath(),
ctx: {
currentTest: {
title: this.ctx && this.ctx.currentTest && this.ctx.currentTest.title
}
},
ctx:
this.ctx && this.ctx.currentTest
? {
currentTest: {
title: this.ctx.currentTest.title,
[MOCHA_ID_PROP_NAME]: this.ctx.currentTest.id
}
}
: {},
parent: {
root: this.parent.root,
title: this.parent.title
[MOCHA_ID_PROP_NAME]: this.parent.id
},
title: this.title,
type: this.type
type: this.type,
[MOCHA_ID_PROP_NAME]: this.id
};
};
154 changes: 137 additions & 17 deletions lib/nodejs/parallel-buffered-runner.js
Expand Up @@ -12,7 +12,13 @@ const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants;
const debug = require('debug')('mocha:parallel:parallel-buffered-runner');
const {BufferedWorkerPool} = require('./buffered-worker-pool');
const {setInterval, clearInterval} = global;
const {createMap} = require('../utils');
const {createMap, constants} = require('../utils');
const {MOCHA_ID_PROP_NAME} = constants;
const {createFatalError} = require('../errors');

const DEFAULT_WORKER_REPORTER = require.resolve(
'./reporters/parallel-buffered'
);

/**
* List of options to _not_ serialize for transmission to workers
Expand Down Expand Up @@ -68,7 +74,7 @@ const states = createMap({
/**
* This `Runner` delegates tests runs to worker threads. Does not execute any
* {@link Runnable}s by itself!
* @private
* @public
*/
class ParallelBufferedRunner extends Runner {
constructor(...args) {
Expand All @@ -88,6 +94,10 @@ class ParallelBufferedRunner extends Runner {
}
});

this._workerReporter = DEFAULT_WORKER_REPORTER;
this._linkPartialObjects = false;
this._linkedObjectMap = new Map();

this.once(Runner.constants.EVENT_RUN_END, () => {
this._state = COMPLETE;
});
Expand All @@ -98,12 +108,68 @@ class ParallelBufferedRunner extends Runner {
* @param {BufferedWorkerPool} pool - Worker pool
* @param {Options} options - Mocha options
* @returns {FileRunner} Mapping function
* @private
*/
_createFileRunner(pool, options) {
/**
* Emits event and sets `BAILING` state, if necessary.
* @param {Object} event - Event having `eventName`, maybe `data` and maybe `error`
* @param {number} failureCount - Failure count
*/
const emitEvent = (event, failureCount) => {
this.emit(event.eventName, event.data, event.error);
if (
this._state !== BAILING &&
event.data &&
event.data._bail &&
(failureCount || event.error)
) {
debug('run(): nonzero failure count & found bail flag');
// we need to let the events complete for this file, as the worker
// should run any cleanup hooks
this._state = BAILING;
}
};

/**
* Given an event, recursively find any objects in its data that have ID's, and create object references to already-seen objects.
* @param {Object} event - Event having `eventName`, maybe `data` and maybe `error`
*/
const linkEvent = event => {
const stack = [{parent: event, prop: 'data'}];
while (stack.length) {
const {parent, prop} = stack.pop();
const obj = parent[prop];
let newObj;
if (obj && typeof obj === 'object') {
if (obj[MOCHA_ID_PROP_NAME]) {
const id = obj[MOCHA_ID_PROP_NAME];
newObj = this._linkedObjectMap.has(id)
? Object.assign(this._linkedObjectMap.get(id), obj)
: obj;
this._linkedObjectMap.set(id, newObj);
parent[prop] = newObj;
} else {
throw createFatalError(
'Object missing ID received in event data',
obj
);
}
}
Object.keys(newObj).forEach(key => {
const value = obj[key];
if (value && typeof value === 'object' && value[MOCHA_ID_PROP_NAME]) {
stack.push({obj: value, parent: newObj, prop: key});
}
});
}
};

return async file => {
debug('run(): enqueueing test file %s', file);
try {
const {failureCount, events} = await pool.run(file, options);

if (this._state === BAILED) {
// short-circuit after a graceful bail. if this happens,
// some other worker has bailed.
Expand All @@ -119,20 +185,18 @@ class ParallelBufferedRunner extends Runner {
);
this.failures += failureCount; // can this ever be non-numeric?
let event = events.shift();
while (event) {
this.emit(event.eventName, event.data, event.error);
if (
this._state !== BAILING &&
event.data &&
event.data._bail &&
(failureCount || event.error)
) {
debug('run(): nonzero failure count & found bail flag');
// we need to let the events complete for this file, as the worker
// should run any cleanup hooks
this._state = BAILING;

if (this._linkPartialObjects) {
while (event) {
linkEvent(event);
emitEvent(event, failureCount);
event = events.shift();
}
} else {
while (event) {
emitEvent(event, failureCount);
event = events.shift();
}
event = events.shift();
}
if (this._state === BAILING) {
debug('run(): terminating pool due to "bail" flag');
Expand Down Expand Up @@ -166,6 +230,7 @@ class ParallelBufferedRunner extends Runner {
* Returns the listener for later call to `process.removeListener()`.
* @param {BufferedWorkerPool} pool - Worker pool
* @returns {SigIntListener} Listener
* @private
*/
_bindSigIntListener(pool) {
const sigIntListener = async () => {
Expand Down Expand Up @@ -209,15 +274,19 @@ class ParallelBufferedRunner extends Runner {
* @param {{files: string[], options: Options}} opts - Files to run and
* command-line options, respectively.
*/
run(callback, {files, options} = {}) {
run(callback, {files, options = {}} = {}) {
/**
* Listener on `Process.SIGINT` which tries to cleanly terminate the worker pool.
*/
let sigIntListener;

// assign the reporter the worker will use, which will be different than the
// main process' reporter
options = {...options, reporter: this._workerReporter};

// This function should _not_ return a `Promise`; its parent (`Runner#run`)
// returns this instance, so this should do the same. However, we want to make
// use of `async`/`await`, so we use this IIFE.

(async () => {
/**
* This is an interval that outputs stats about the worker pool every so often
Expand Down Expand Up @@ -293,6 +362,57 @@ class ParallelBufferedRunner extends Runner {
})();
return this;
}

/**
* Toggle partial object linking behavior; used for building object references from
* unique ID's.
* @param {boolean} [value] - If `true`, enable partial object linking, otherwise disable
* @returns {Runner}
* @chainable
* @public
* @example
* // this reporter needs proper object references when run in parallel mode
* class MyReporter() {
* constructor(runner) {
* this.runner.linkPartialObjects(true)
* .on(EVENT_SUITE_BEGIN, suite => {
// this Suite may be the same object...
* })
* .on(EVENT_TEST_BEGIN, test => {
* // ...as the `test.parent` property
* });
* }
* }
*/
linkPartialObjects(value) {
this._linkPartialObjects = Boolean(value);
return super.linkPartialObjects(value);
}

/**
* If this class is the `Runner` in use, then this is going to return `true`.
*
* For use by reporters.
* @returns {true}
* @public
*/
isParallelMode() {
return true;
}

/**
* Configures an alternate reporter for worker processes to use. Subclasses
* using worker processes should implement this.
* @public
* @param {string} path - Absolute path to alternate reporter for worker processes to use
* @returns {Runner}
* @throws When in serial mode
* @chainable
*/
workerReporter(reporter) {
this._workerReporter = reporter;
return this;
}
}

module.exports = ParallelBufferedRunner;
Expand Down