Skip to content

Commit

Permalink
add public APIs for detection of parallel mode by a reporter, and the…
Browse files Browse the repository at this point in the history
… ability to swap the worker reporter with another

- made some APIs public
- refactor `ParallelBuffered` reporter into multiple methods which _may_ (took a wild-ass guess) be useful to override, if `ParallelBuffered` were to be subclassed
  • Loading branch information
boneskull committed Aug 28, 2020
1 parent 91abd7f commit bf91eac
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 44 deletions.
42 changes: 39 additions & 3 deletions lib/nodejs/parallel-buffered-runner.js
Expand Up @@ -15,6 +15,10 @@ const {setInterval, clearInterval} = global;
const {createMap, constants} = require('../utils');
const {MOCHA_ID_PROP_NAME} = constants;

const DEFAULT_WORKER_REPORTER = require.resolve(
'./reporters/parallel-buffered'
);

/**
* Outputs a debug statement with worker stats
* @param {BufferedWorkerPool} pool - Worker pool
Expand Down Expand Up @@ -57,7 +61,7 @@ const states = createMap({
/**
* This `Runner` delegates tests runs to worker threads. Does not execute any
* {@link Runnable}s by itself!
* @private
* @public
*/
class ParallelBufferedRunner extends Runner {
constructor(...args) {
Expand All @@ -77,6 +81,7 @@ class ParallelBufferedRunner extends Runner {
}
});

this._workerReporter = DEFAULT_WORKER_REPORTER;
this._linkPartialObjects = false;
this._linkedObjectMap = new Map();

Expand All @@ -90,6 +95,7 @@ class ParallelBufferedRunner extends Runner {
* @param {BufferedWorkerPool} pool - Worker pool
* @param {Options} options - Mocha options
* @returns {FileRunner} Mapping function
* @private
*/
_createFileRunner(pool, options) {
/**
Expand Down Expand Up @@ -206,6 +212,7 @@ class ParallelBufferedRunner extends Runner {
* Returns the listener for later call to `process.removeListener()`.
* @param {BufferedWorkerPool} pool - Worker pool
* @returns {SigIntListener} Listener
* @private
*/
_bindSigIntListener(pool) {
const sigIntListener = async () => {
Expand Down Expand Up @@ -249,15 +256,19 @@ class ParallelBufferedRunner extends Runner {
* @param {{files: string[], options: Options}} opts - Files to run and
* command-line options, respectively.
*/
run(callback, {files, options} = {}) {
run(callback, {files, options = {}} = {}) {
/**
* Listener on `Process.SIGINT` which tries to cleanly terminate the worker pool.
*/
let sigIntListener;

// assign the reporter the worker will use, which will be different than the
// main process' reporter
options = {...options, reporter: this._workerReporter};

// This function should _not_ return a `Promise`; its parent (`Runner#run`)
// returns this instance, so this should do the same. However, we want to make
// use of `async`/`await`, so we use this IIFE.

(async () => {
/**
* This is an interval that outputs stats about the worker pool every so often
Expand Down Expand Up @@ -353,6 +364,31 @@ class ParallelBufferedRunner extends Runner {
this._linkPartialObjects = Boolean(value);
return super.linkPartialObjects(value);
}

/**
* If this class is the `Runner` in use, then this is going to return `true`.
*
* For use by reporters.
* @returns {true}
* @public
*/
isParallelMode() {
return true;
}

/**
* Configures an alternate reporter for worker processes to use. Subclasses
* using worker processes should implement this.
* @public
* @param {string} path - Absolute path to alternate reporter for worker processes to use
* @returns {Runner}
* @throws When in serial mode
* @chainable
*/
workerReporter(reporter) {
this._workerReporter = reporter;
return this;
}
}

module.exports = ParallelBufferedRunner;
Expand Down
90 changes: 61 additions & 29 deletions lib/nodejs/reporters/parallel-buffered.js
@@ -1,7 +1,7 @@
/**
* "Buffered" reporter used internally by a worker process when running in parallel mode.
* @module reporters/parallel-buffered
* @private
* @module nodejs/reporters/parallel-buffered
* @public
*/

'use strict';
Expand Down Expand Up @@ -53,15 +53,16 @@ const EVENT_NAMES = [
const ONCE_EVENT_NAMES = [EVENT_DELAY_BEGIN, EVENT_DELAY_END];

/**
* The `ParallelBuffered` reporter is for use by concurrent runs. Instead of outputting
* to `STDOUT`, etc., it retains a list of events it receives and hands these
* off to the callback passed into {@link Mocha#run}. That callback will then
* return the data to the main process.
* @private
* The `ParallelBuffered` reporter is used by each worker process in "parallel"
* mode, by default. Instead of reporting to to `STDOUT`, etc., it retains a
* list of events it receives and hands these off to the callback passed into
* {@link Mocha#run}. That callback will then return the data to the main
* process.
* @public
*/
class ParallelBuffered extends Base {
/**
* Listens for {@link Runner} events and retains them in an `events` instance prop.
* Calls {@link ParallelBuffered#createListeners}
* @param {Runner} runner
*/
constructor(runner, opts) {
Expand All @@ -70,50 +71,81 @@ class ParallelBuffered extends Base {
/**
* Retained list of events emitted from the {@link Runner} instance.
* @type {BufferedEvent[]}
* @memberOf Buffered
* @public
*/
const events = (this.events = []);
this.events = [];

/**
* mapping of event names to listener functions we've created,
* so we can cleanly _remove_ them from the runner once it's completed.
* Map of `Runner` event names to listeners (for later teardown)
* @public
* @type {Map<string,EventListener>}
*/
const listeners = new Map();
this.listeners = new Map();

/**
* Creates a listener for event `eventName` and adds it to the `listeners`
* map. This is a defensive measure, so that we don't a) leak memory or b)
* remove _other_ listeners that may not be associated with this reporter.
* @param {string} eventName - Event name
*/
const createListener = eventName =>
listeners
.set(eventName, (runnable, err) => {
events.push(SerializableEvent.create(eventName, runnable, err));
})
.get(eventName);
this.createListeners(runner);
}

/**
* Returns a new listener which saves event data in memory to
* {@link ParallelBuffered#events}. Listeners are indexed by `eventName` and stored
* in {@link ParallelBuffered#listeners}. This is a defensive measure, so that we
* don't a) leak memory or b) remove _other_ listeners that may not be
* associated with this reporter.
*
* Subclasses could override this behavior.
*
* @public
* @param {string} eventName - Name of event to create listener for
* @returns {EventListener}
*/
createListener(eventName) {
const listener = (runnable, err) => {
this.events.push(SerializableEvent.create(eventName, runnable, err));
};
return this.listeners.set(eventName, listener).get(eventName);
}

/**
* Creates event listeners (using {@link ParallelBuffered#createListener}) for each
* reporter-relevant event emitted by a {@link Runner}. This array is drained when
* {@link ParallelBuffered#done} is called by {@link Runner#run}.
*
* Subclasses could override this behavior.
* @public
* @param {Runner} runner - Runner instance
* @returns {ParallelBuffered}
* @chainable
*/
createListeners(runner) {
EVENT_NAMES.forEach(evt => {
runner.on(evt, createListener(evt));
runner.on(evt, this.createListener(evt));
});
ONCE_EVENT_NAMES.forEach(evt => {
runner.once(evt, createListener(evt));
runner.once(evt, this.createListener(evt));
});

runner.once(EVENT_RUN_END, () => {
debug('received EVENT_RUN_END');
listeners.forEach((listener, evt) => {
this.listeners.forEach((listener, evt) => {
runner.removeListener(evt, listener);
listeners.delete(evt);
this.listeners.delete(evt);
});
});

return this;
}

/**
* Calls the {@link Mocha#run} callback (`callback`) with the test failure
* count and the array of {@link BufferedEvent} objects. Resets the array.
*
* This is called directly by `Runner#run` and should not be called by any other consumer.
*
* Subclasses could override this.
*
* @param {number} failures - Number of failed tests
* @param {Function} callback - The callback passed to {@link Mocha#run}.
* @public
*/
done(failures, callback) {
callback(SerializableWorkerResult.create(this.events, failures));
Expand Down
4 changes: 0 additions & 4 deletions lib/nodejs/worker.js
Expand Up @@ -23,8 +23,6 @@ const isDebugEnabled = d.enabled(`mocha:parallel:worker:${process.pid}`);
const {serialize} = require('./serializer');
const {setInterval, clearInterval} = global;

const BUFFERED_REPORTER_PATH = require.resolve('./reporters/parallel-buffered');

let rootHooks;

if (workerpool.isMainThread) {
Expand Down Expand Up @@ -91,8 +89,6 @@ async function run(filepath, serializedOptions = '{}') {
}

const opts = Object.assign({ui: 'bdd'}, argv, {
// workers only use the `Buffered` reporter.
reporter: BUFFERED_REPORTER_PATH,
// if this was true, it would cause infinite recursion.
parallel: false,
// this doesn't work in parallel mode
Expand Down
25 changes: 25 additions & 0 deletions lib/runner.js
Expand Up @@ -1084,6 +1084,31 @@ Runner.prototype.abort = function() {
return this;
};

/**
* Returns `true` if Mocha is running in parallel mode. For reporters.
*
* Subclasses should return an appropriate value.
* @public
* @returns {false}
*/
Runner.prototype.isParallelMode = function isParallelMode() {
return false;
};

/**
* Configures an alternate reporter for worker processes to use. Subclasses
* using worker processes should implement this.
* @public
* @param {string} path - Absolute path to alternate reporter for worker processes to use
* @returns {Runner}
* @throws When in serial mode
* @chainable
* @abstract
*/
Runner.prototype.workerReporter = function() {
throw createUnsupportedError('workerReporter() not supported in serial mode');
};

/**
* Filter leaks with the given globals flagged as `ok`.
*
Expand Down
40 changes: 32 additions & 8 deletions test/node-unit/parallel-buffered-runner.spec.js
Expand Up @@ -135,7 +135,7 @@ describe('parallel-buffered-runner', function() {
});

it('should create object references', function() {
const options = {};
const options = {reporter: runner._workerReporter};
const someSuite = {
title: 'some suite',
[MOCHA_ID_PROP_NAME]: 'bar'
Expand Down Expand Up @@ -189,7 +189,7 @@ describe('parallel-buffered-runner', function() {

describe('when a worker fails', function() {
it('should recover', function(done) {
const options = {};
const options = {reporter: runner._workerReporter};
run.withArgs('some-file.js', options).rejects(new Error('whoops'));
run.withArgs('some-other-file.js', options).resolves({
failureCount: 0,
Expand Down Expand Up @@ -222,7 +222,7 @@ describe('parallel-buffered-runner', function() {
});

it('should delegate to Runner#uncaught', function(done) {
const options = {};
const options = {reporter: runner._workerReporter};
sinon.spy(runner, 'uncaught');
const err = new Error('whoops');
run.withArgs('some-file.js', options).rejects(new Error('whoops'));
Expand Down Expand Up @@ -304,7 +304,7 @@ describe('parallel-buffered-runner', function() {
describe('when an event contains an error and has positive failures', function() {
describe('when subsequent files have not yet been run', function() {
it('should cleanly terminate the thread pool', function(done) {
const options = {};
const options = {reporter: runner._workerReporter};
const err = {
__type: 'Error',
message: 'oh no'
Expand Down Expand Up @@ -354,7 +354,7 @@ describe('parallel-buffered-runner', function() {
});
describe('when subsequent files already started running', function() {
it('should cleanly terminate the thread pool', function(done) {
const options = {};
const options = {reporter: runner._workerReporter};
const err = {
__type: 'Error',
message: 'oh no'
Expand Down Expand Up @@ -466,7 +466,7 @@ describe('parallel-buffered-runner', function() {
describe('when an event contains an error and has positive failures', function() {
describe('when subsequent files have not yet been run', function() {
it('should cleanly terminate the thread pool', function(done) {
const options = {};
const options = {reporter: runner._workerReporter};
const err = {
__type: 'Error',
message: 'oh no'
Expand Down Expand Up @@ -510,7 +510,7 @@ describe('parallel-buffered-runner', function() {

describe('when subsequent files already started running', function() {
it('should cleanly terminate the thread pool', function(done) {
const options = {};
const options = {reporter: runner._workerReporter};
const err = {
__type: 'Error',
message: 'oh no'
Expand Down Expand Up @@ -570,7 +570,7 @@ describe('parallel-buffered-runner', function() {

describe('when subsequent files have not yet been run', function() {
it('should cleanly terminate the thread pool', function(done) {
const options = {};
const options = {reporter: runner._workerReporter};
const err = {
__type: 'Error',
message: 'oh no'
Expand Down Expand Up @@ -628,6 +628,30 @@ describe('parallel-buffered-runner', function() {

// avoid testing implementation details; don't check _linkPartialObjects
});

describe('isParallelMode()', function() {
let runner;

beforeEach(function() {
runner = new ParallelBufferedRunner(suite);
});

it('should return true', function() {
expect(runner.isParallelMode(), 'to be true');
});
});

describe('workerReporter()', function() {
let runner;

beforeEach(function() {
runner = new ParallelBufferedRunner(suite);
});

it('should return its context', function() {
expect(runner.workerReporter(), 'to be', runner);
});
});
});
});
});

0 comments on commit bf91eac

Please sign in to comment.