| /** |
| * A test Runner that uses a {@link module:buffered-worker-pool}. |
| * @module parallel-buffered-runner |
| * @private |
| */ |
| |
| 'use strict'; |
| |
| const allSettled = require('@ungap/promise-all-settled').bind(Promise); |
| const Runner = require('../runner'); |
| const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants; |
| const debug = require('debug')('mocha:parallel:parallel-buffered-runner'); |
| const {BufferedWorkerPool} = require('./buffered-worker-pool'); |
| const {setInterval, clearInterval} = global; |
| const {createMap, constants} = require('../utils'); |
| const {MOCHA_ID_PROP_NAME} = constants; |
| const {createFatalError} = require('../errors'); |
| |
| const DEFAULT_WORKER_REPORTER = require.resolve( |
| './reporters/parallel-buffered' |
| ); |
| |
| /** |
| * List of options to _not_ serialize for transmission to workers |
| */ |
| const DENY_OPTIONS = [ |
| 'globalSetup', |
| 'globalTeardown', |
| 'parallel', |
| 'p', |
| 'jobs', |
| 'j' |
| ]; |
| |
| /** |
| * Outputs a debug statement with worker stats |
| * @param {BufferedWorkerPool} pool - Worker pool |
| */ |
| /* istanbul ignore next */ |
| const debugStats = pool => { |
| const {totalWorkers, busyWorkers, idleWorkers, pendingTasks} = pool.stats(); |
| debug( |
| '%d/%d busy workers; %d idle; %d tasks queued', |
| busyWorkers, |
| totalWorkers, |
| idleWorkers, |
| pendingTasks |
| ); |
| }; |
| |
| /** |
| * The interval at which we will display stats for worker processes in debug mode |
| */ |
| const DEBUG_STATS_INTERVAL = 5000; |
| |
| const ABORTED = 'ABORTED'; |
| const IDLE = 'IDLE'; |
| const ABORTING = 'ABORTING'; |
| const RUNNING = 'RUNNING'; |
| const BAILING = 'BAILING'; |
| const BAILED = 'BAILED'; |
| const COMPLETE = 'COMPLETE'; |
| |
| const states = createMap({ |
| [IDLE]: new Set([RUNNING, ABORTING]), |
| [RUNNING]: new Set([COMPLETE, BAILING, ABORTING]), |
| [COMPLETE]: new Set(), |
| [ABORTED]: new Set(), |
| [ABORTING]: new Set([ABORTED]), |
| [BAILING]: new Set([BAILED, ABORTING]), |
| [BAILED]: new Set([COMPLETE, ABORTING]) |
| }); |
| |
| /** |
| * This `Runner` delegates tests runs to worker threads. Does not execute any |
| * {@link Runnable}s by itself! |
| * @public |
| */ |
| class ParallelBufferedRunner extends Runner { |
| constructor(...args) { |
| super(...args); |
| |
| let state = IDLE; |
| Object.defineProperty(this, '_state', { |
| get() { |
| return state; |
| }, |
| set(newState) { |
| if (states[state].has(newState)) { |
| state = newState; |
| } else { |
| throw new Error(`invalid state transition: ${state} => ${newState}`); |
| } |
| } |
| }); |
| |
| this._workerReporter = DEFAULT_WORKER_REPORTER; |
| this._linkPartialObjects = false; |
| this._linkedObjectMap = new Map(); |
| |
| this.once(Runner.constants.EVENT_RUN_END, () => { |
| this._state = COMPLETE; |
| }); |
| } |
| |
| /** |
| * Returns a mapping function to enqueue a file in the worker pool and return results of its execution. |
| * @param {BufferedWorkerPool} pool - Worker pool |
| * @param {Options} options - Mocha options |
| * @returns {FileRunner} Mapping function |
| * @private |
| */ |
| _createFileRunner(pool, options) { |
| /** |
| * Emits event and sets `BAILING` state, if necessary. |
| * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error` |
| * @param {number} failureCount - Failure count |
| */ |
| const emitEvent = (event, failureCount) => { |
| this.emit(event.eventName, event.data, event.error); |
| if ( |
| this._state !== BAILING && |
| event.data && |
| event.data._bail && |
| (failureCount || event.error) |
| ) { |
| debug('run(): nonzero failure count & found bail flag'); |
| // we need to let the events complete for this file, as the worker |
| // should run any cleanup hooks |
| this._state = BAILING; |
| } |
| }; |
| |
| /** |
| * Given an event, recursively find any objects in its data that have ID's, and create object references to already-seen objects. |
| * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error` |
| */ |
| const linkEvent = event => { |
| const stack = [{parent: event, prop: 'data'}]; |
| while (stack.length) { |
| const {parent, prop} = stack.pop(); |
| const obj = parent[prop]; |
| let newObj; |
| if (obj && typeof obj === 'object') { |
| if (obj[MOCHA_ID_PROP_NAME]) { |
| const id = obj[MOCHA_ID_PROP_NAME]; |
| newObj = this._linkedObjectMap.has(id) |
| ? Object.assign(this._linkedObjectMap.get(id), obj) |
| : obj; |
| this._linkedObjectMap.set(id, newObj); |
| parent[prop] = newObj; |
| } else { |
| throw createFatalError( |
| 'Object missing ID received in event data', |
| obj |
| ); |
| } |
| } |
| Object.keys(newObj).forEach(key => { |
| const value = obj[key]; |
| if (value && typeof value === 'object' && value[MOCHA_ID_PROP_NAME]) { |
| stack.push({obj: value, parent: newObj, prop: key}); |
| } |
| }); |
| } |
| }; |
| |
| return async file => { |
| debug('run(): enqueueing test file %s', file); |
| try { |
| const {failureCount, events} = await pool.run(file, options); |
| |
| if (this._state === BAILED) { |
| // short-circuit after a graceful bail. if this happens, |
| // some other worker has bailed. |
| // TODO: determine if this is the desired behavior, or if we |
| // should report the events of this run anyway. |
| return; |
| } |
| debug( |
| 'run(): completed run of file %s; %d failures / %d events', |
| file, |
| failureCount, |
| events.length |
| ); |
| this.failures += failureCount; // can this ever be non-numeric? |
| let event = events.shift(); |
| |
| if (this._linkPartialObjects) { |
| while (event) { |
| linkEvent(event); |
| emitEvent(event, failureCount); |
| event = events.shift(); |
| } |
| } else { |
| while (event) { |
| emitEvent(event, failureCount); |
| event = events.shift(); |
| } |
| } |
| if (this._state === BAILING) { |
| debug('run(): terminating pool due to "bail" flag'); |
| this._state = BAILED; |
| await pool.terminate(); |
| } |
| } catch (err) { |
| if (this._state === BAILED || this._state === ABORTING) { |
| debug( |
| 'run(): worker pool terminated with intent; skipping file %s', |
| file |
| ); |
| } else { |
| // this is an uncaught exception |
| debug('run(): encountered uncaught exception: %O', err); |
| if (this.allowUncaught) { |
| // still have to clean up |
| this._state = ABORTING; |
| await pool.terminate(true); |
| } |
| throw err; |
| } |
| } finally { |
| debug('run(): done running file %s', file); |
| } |
| }; |
| } |
| |
| /** |
| * Listen on `Process.SIGINT`; terminate pool if caught. |
| * Returns the listener for later call to `process.removeListener()`. |
| * @param {BufferedWorkerPool} pool - Worker pool |
| * @returns {SigIntListener} Listener |
| * @private |
| */ |
| _bindSigIntListener(pool) { |
| const sigIntListener = async () => { |
| debug('run(): caught a SIGINT'); |
| this._state = ABORTING; |
| |
| try { |
| debug('run(): force-terminating worker pool'); |
| await pool.terminate(true); |
| } catch (err) { |
| console.error( |
| `Error while attempting to force-terminate worker pool: ${err}` |
| ); |
| process.exitCode = 1; |
| } finally { |
| process.nextTick(() => { |
| debug('run(): imminent death'); |
| this._state = ABORTED; |
| process.kill(process.pid, 'SIGINT'); |
| }); |
| } |
| }; |
| |
| process.once('SIGINT', sigIntListener); |
| |
| return sigIntListener; |
| } |
| |
| /** |
| * Runs Mocha tests by creating a thread pool, then delegating work to the |
| * worker threads. |
| * |
| * Each worker receives one file, and as workers become available, they take a |
| * file from the queue and run it. The worker thread execution is treated like |
| * an RPC--it returns a `Promise` containing serialized information about the |
| * run. The information is processed as it's received, and emitted to a |
| * {@link Reporter}, which is likely listening for these events. |
| * |
| * @param {Function} callback - Called with an exit code corresponding to |
| * number of test failures. |
| * @param {{files: string[], options: Options}} opts - Files to run and |
| * command-line options, respectively. |
| */ |
| run(callback, {files, options = {}} = {}) { |
| /** |
| * Listener on `Process.SIGINT` which tries to cleanly terminate the worker pool. |
| */ |
| let sigIntListener; |
| |
| // assign the reporter the worker will use, which will be different than the |
| // main process' reporter |
| options = {...options, reporter: this._workerReporter}; |
| |
| // This function should _not_ return a `Promise`; its parent (`Runner#run`) |
| // returns this instance, so this should do the same. However, we want to make |
| // use of `async`/`await`, so we use this IIFE. |
| (async () => { |
| /** |
| * This is an interval that outputs stats about the worker pool every so often |
| */ |
| let debugInterval; |
| |
| /** |
| * @type {BufferedWorkerPool} |
| */ |
| let pool; |
| |
| try { |
| pool = BufferedWorkerPool.create({maxWorkers: options.jobs}); |
| |
| sigIntListener = this._bindSigIntListener(pool); |
| |
| /* istanbul ignore next */ |
| debugInterval = setInterval( |
| () => debugStats(pool), |
| DEBUG_STATS_INTERVAL |
| ).unref(); |
| |
| // this is set for uncaught exception handling in `Runner#uncaught` |
| // TODO: `Runner` should be using a state machine instead. |
| this.started = true; |
| this._state = RUNNING; |
| |
| this.emit(EVENT_RUN_BEGIN); |
| |
| options = {...options}; |
| DENY_OPTIONS.forEach(opt => { |
| delete options[opt]; |
| }); |
| |
| const results = await allSettled( |
| files.map(this._createFileRunner(pool, options)) |
| ); |
| |
| // note that pool may already be terminated due to --bail |
| await pool.terminate(); |
| |
| results |
| .filter(({status}) => status === 'rejected') |
| .forEach(({reason}) => { |
| if (this.allowUncaught) { |
| // yep, just the first one. |
| throw reason; |
| } |
| // "rejected" will correspond to uncaught exceptions. |
| // unlike the serial runner, the parallel runner can always recover. |
| this.uncaught(reason); |
| }); |
| |
| if (this._state === ABORTING) { |
| return; |
| } |
| |
| this.emit(EVENT_RUN_END); |
| debug('run(): completing with failure count %d', this.failures); |
| callback(this.failures); |
| } catch (err) { |
| // this `nextTick` takes us out of the `Promise` scope, so the |
| // exception will not be caught and returned as a rejected `Promise`, |
| // which would lead to an `unhandledRejection` event. |
| process.nextTick(() => { |
| debug('run(): re-throwing uncaught exception'); |
| throw err; |
| }); |
| } finally { |
| clearInterval(debugInterval); |
| process.removeListener('SIGINT', sigIntListener); |
| } |
| })(); |
| return this; |
| } |
| |
| /** |
| * Toggle partial object linking behavior; used for building object references from |
| * unique ID's. |
| * @param {boolean} [value] - If `true`, enable partial object linking, otherwise disable |
| * @returns {Runner} |
| * @chainable |
| * @public |
| * @example |
| * // this reporter needs proper object references when run in parallel mode |
| * class MyReporter() { |
| * constructor(runner) { |
| * this.runner.linkPartialObjects(true) |
| * .on(EVENT_SUITE_BEGIN, suite => { |
| // this Suite may be the same object... |
| * }) |
| * .on(EVENT_TEST_BEGIN, test => { |
| * // ...as the `test.parent` property |
| * }); |
| * } |
| * } |
| */ |
| linkPartialObjects(value) { |
| this._linkPartialObjects = Boolean(value); |
| return super.linkPartialObjects(value); |
| } |
| |
| /** |
| * If this class is the `Runner` in use, then this is going to return `true`. |
| * |
| * For use by reporters. |
| * @returns {true} |
| * @public |
| */ |
| isParallelMode() { |
| return true; |
| } |
| |
| /** |
| * Configures an alternate reporter for worker processes to use. Subclasses |
| * using worker processes should implement this. |
| * @public |
| * @param {string} path - Absolute path to alternate reporter for worker processes to use |
| * @returns {Runner} |
| * @throws When in serial mode |
| * @chainable |
| */ |
| workerReporter(reporter) { |
| this._workerReporter = reporter; |
| return this; |
| } |
| } |
| |
| module.exports = ParallelBufferedRunner; |
| |
| /** |
| * Listener function intended to be bound to `Process.SIGINT` event |
| * @private |
| * @callback SigIntListener |
| * @returns {Promise<void>} |
| */ |
| |
| /** |
| * A function accepting a test file path and returning the results of a test run |
| * @private |
| * @callback FileRunner |
| * @param {string} filename - File to run |
| * @returns {Promise<SerializedWorkerResult>} |
| */ |