diff --git a/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json b/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json new file mode 100644 index 0000000000..f2a6516155 --- /dev/null +++ b/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@microsoft/rush", + "comment": "Add a `\"weight\"` property to the `\"operation\"` object in the project `config/rush-project.json` file that defines an integer weight for how much of the allowed parallelism the operation uses.", + "type": "none" + } + ], + "packageName": "@microsoft/rush" +} \ No newline at end of file diff --git a/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json new file mode 100644 index 0000000000..c0dd13175b --- /dev/null +++ b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@rushstack/node-core-library", + "comment": "Add a new `weighted: true` option to the `Async.forEachAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.", + "type": "minor" + } + ], + "packageName": "@rushstack/node-core-library" +} diff --git a/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-03-22-13.json b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-03-22-13.json new file mode 100644 index 0000000000..9dff8a4060 --- /dev/null +++ b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-03-22-13.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@rushstack/node-core-library", + "comment": "Add a new `weighted: true` option to the `Async.mapAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.", + "type": "patch" + } + ], + "packageName": "@rushstack/node-core-library" +} \ No newline at end of file diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index a94b4110eb..f850c8bef6 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -25,11 +25,22 @@ export class AlreadyReportedError extends Error { // @public export class Async { - static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; + static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: (IAsyncParallelismOptions & { + weighted?: false; + }) | undefined): Promise; + static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: IAsyncParallelismOptions & { + weighted: true; + }): Promise; static getSignal(): [Promise, () => void, (err: Error) => void]; - static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; + static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: (IAsyncParallelismOptions & { + weighted?: false; + }) | undefined): Promise; + static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: IAsyncParallelismOptions & { + weighted: true; + }): Promise; static runWithRetriesAsync({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions): Promise; static sleep(ms: number): Promise; + static validateWeightedIterable(operation: IWeighted): void; } // @public @@ -225,6 +236,7 @@ export type FolderItem = fs.Dirent; // @public export interface IAsyncParallelismOptions { concurrency?: number; + weighted?: boolean; } // @public @@ -602,6 +614,11 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions { encoding: BufferEncoding; } +// @public (undocumented) +export interface IWeighted { + weight: number; +} + // @public export class JsonFile { // @internal (undocumented) diff --git a/common/reviews/api/rush-lib.api.md b/common/reviews/api/rush-lib.api.md index a5f9c97cfd..4a2f583f7e 100644 --- a/common/reviews/api/rush-lib.api.md +++ b/common/reviews/api/rush-lib.api.md @@ -619,6 +619,7 @@ export interface IOperationSettings { disableBuildCacheForOperation?: boolean; operationName: string; outputFolderNames?: string[]; + weight?: number; } // @internal (undocumented) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index c68f22240b..400cb8e00d 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -5,16 +5,24 @@ * Options for controlling the parallelism of asynchronous operations. * * @remarks - * Used with {@link Async.mapAsync} and {@link Async.forEachAsync}. + * Used with {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and + * {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)}. * * @public */ export interface IAsyncParallelismOptions { /** - * Optionally used with the {@link Async.mapAsync} and {@link Async.forEachAsync} - * to limit the maximum number of concurrent promises to the specified number. + * Optionally used with the {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and + * {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)} to limit the maximum + * number of concurrent promises to the specified number. */ concurrency?: number; + + /** + * Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an operation can + * take up more or less than one concurrency unit. + */ + weighted?: boolean; } /** @@ -29,6 +37,42 @@ export interface IRunWithRetriesOptions { retryDelayMs?: number; } +/** + * @remarks + * Used with {@link (Async:class).(forEachAsync:2)} and {@link (Async:class).(mapAsync:2)}. + * + * @public + */ +export interface IWeighted { + /** + * The weight of the element, used to determine the concurrency units that it will take up. + * Must be a whole number greater than or equal to 0. + */ + weight: number; +} + +function toWeightedIterator( + iterable: Iterable | AsyncIterable, + useWeights?: boolean +): AsyncIterable<{ element: TEntry; weight: number }> { + const iterator: Iterator | AsyncIterator = ( + (iterable as Iterable)[Symbol.iterator] || + (iterable as AsyncIterable)[Symbol.asyncIterator] + ).call(iterable); + return { + [Symbol.asyncIterator]: () => ({ + next: async () => { + // The await is necessary here, but TS will complain - it's a false positive. + const { value, done } = await iterator.next(); + return { + value: { element: value, weight: useWeights ? value?.weight : 1 }, + done: !!done + }; + } + }) + }; +} + /** * Utilities for parallel asynchronous operations, for use with the system `Promise` APIs. * @@ -58,29 +102,18 @@ export class Async { public static async mapAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, - options?: IAsyncParallelismOptions | undefined - ): Promise { - const result: TRetVal[] = []; - - await Async.forEachAsync( - iterable, - async (item: TEntry, arrayIndex: number): Promise => { - result[arrayIndex] = await callback(item, arrayIndex); - }, - options - ); - - return result; - } + options?: (IAsyncParallelismOptions & { weighted?: false }) | undefined + ): Promise; /** * Given an input array and a `callback` function, invoke the callback to start a - * promise for each element in the array. + * promise for each element in the array. Returns an array containing the results. * * @remarks - * This API is similar to the system `Array#forEach`, except that the loop is asynchronous, - * and the maximum number of concurrent promises can be throttled - * using {@link IAsyncParallelismOptions.concurrency}. + * This API is similar to the system `Array#map`, except that the loop is asynchronous, + * and the maximum number of concurrent units can be throttled + * using {@link IAsyncParallelismOptions.concurrency}. Using the {@link IAsyncParallelismOptions.weighted} + * option, the weight of each operation can be specified, which determines how many concurrent units it takes up. * * If `callback` throws a synchronous exception, or if it returns a promise that rejects, * then the loop stops immediately. Any remaining array items will be skipped, and @@ -90,16 +123,42 @@ export class Async { * @param callback - a function that starts an asynchronous promise for an element * from the array * @param options - options for customizing the control flow + * @returns an array containing the result for each callback, in the same order + * as the original input `array` */ - public static async forEachAsync( + public static async mapAsync( iterable: Iterable | AsyncIterable, - callback: (entry: TEntry, arrayIndex: number) => Promise, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options: IAsyncParallelismOptions & { weighted: true } + ): Promise; + public static async mapAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options?: IAsyncParallelismOptions | undefined + ): Promise { + const result: TRetVal[] = []; + + // @ts-expect-error https://github.com/microsoft/TypeScript/issues/22609, it succeeds against the implementation but fails against the overloads + await Async.forEachAsync( + iterable, + async (item: TEntry, arrayIndex: number): Promise => { + result[arrayIndex] = await callback(item, arrayIndex); + }, + options + ); + + return result; + } + + private static async _forEachWeightedAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined ): Promise { await new Promise((resolve: () => void, reject: (error: Error) => void) => { const concurrency: number = options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; - let operationsInProgress: number = 0; + let concurrentUnitsInProgress: number = 0; const iterator: Iterator | AsyncIterator = ( (iterable as Iterable)[Symbol.iterator] || @@ -111,18 +170,29 @@ export class Async { let promiseHasResolvedOrRejected: boolean = false; async function queueOperationsAsync(): Promise { - while (operationsInProgress < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) { + while ( + concurrentUnitsInProgress < concurrency && + !iteratorIsComplete && + !promiseHasResolvedOrRejected + ) { // Increment the concurrency while waiting for the iterator. // This function is reentrant, so this ensures that at most `concurrency` executions are waiting - operationsInProgress++; + concurrentUnitsInProgress++; const currentIteratorResult: IteratorResult = await iterator.next(); // eslint-disable-next-line require-atomic-updates iteratorIsComplete = !!currentIteratorResult.done; if (!iteratorIsComplete) { - Promise.resolve(callback(currentIteratorResult.value, arrayIndex++)) + const currentIteratorValue: TEntry = currentIteratorResult.value; + Async.validateWeightedIterable(currentIteratorValue); + const weight: number = Math.min(currentIteratorValue.weight, concurrency); + // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. + // Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted + // operations are present. + concurrentUnitsInProgress += weight - 1; + Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) .then(async () => { - operationsInProgress--; + concurrentUnitsInProgress -= weight; await onOperationCompletionAsync(); }) .catch((error) => { @@ -131,7 +201,7 @@ export class Async { }); } else { // The iterator is complete and there wasn't a value, so untrack the waiting state. - operationsInProgress--; + concurrentUnitsInProgress--; } } @@ -142,7 +212,7 @@ export class Async { async function onOperationCompletionAsync(): Promise { if (!promiseHasResolvedOrRejected) { - if (operationsInProgress === 0 && iteratorIsComplete) { + if (concurrentUnitsInProgress === 0 && iteratorIsComplete) { promiseHasResolvedOrRejected = true; resolve(); } else if (!iteratorIsComplete) { @@ -158,6 +228,68 @@ export class Async { }); } + /** + * Given an input array and a `callback` function, invoke the callback to start a + * promise for each element in the array. + * + * @remarks + * This API is similar to the system `Array#forEach`, except that the loop is asynchronous, + * and the maximum number of concurrent promises can be throttled + * using {@link IAsyncParallelismOptions.concurrency}. + * + * If `callback` throws a synchronous exception, or if it returns a promise that rejects, + * then the loop stops immediately. Any remaining array items will be skipped, and + * overall operation will reject with the first error that was encountered. + * + * @param iterable - the array of inputs for the callback function + * @param callback - a function that starts an asynchronous promise for an element + * from the array + * @param options - options for customizing the control flow + */ + public static async forEachAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options?: (IAsyncParallelismOptions & { weighted?: false }) | undefined + ): Promise; + + /** + * Given an input array and a `callback` function, invoke the callback to start a + * promise for each element in the array. + * + * @remarks + * This API is similar to the other `Array#forEachAsync`, except that each item can have + * a weight that determines how many concurrent operations are allowed. The unweighted + * `Array#forEachAsync` is a special case of this method where weight = 1 for all items. + * + * The maximum number of concurrent operations can still be throttled using + * {@link IAsyncParallelismOptions.concurrency}, however it no longer determines the + * maximum number of operations that can be in progress at once. Instead, it determines the + * number of concurrency units that can be in progress at once. The weight of each operation + * determines how many concurrency units it takes up. For example, if the concurrency is 2 + * and the first operation has a weight of 2, then only one more operation can be in progress. + * + * If `callback` throws a synchronous exception, or if it returns a promise that rejects, + * then the loop stops immediately. Any remaining array items will be skipped, and + * overall operation will reject with the first error that was encountered. + * + * @param iterable - the array of inputs for the callback function + * @param callback - a function that starts an asynchronous promise for an element + * from the array + * @param options - options for customizing the control flow + */ + public static async forEachAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options: IAsyncParallelismOptions & { weighted: true } + ): Promise; + public static async forEachAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options?: IAsyncParallelismOptions + ): Promise { + await Async._forEachWeightedAsync(toWeightedIterator(iterable, options?.weighted), callback, options); + } + /** * Return a promise that resolves after the specified number of milliseconds. */ @@ -190,6 +322,19 @@ export class Async { } } + /** + * Ensures that the argument is a valid {@link IWeighted}, with a `weight` argument that + * is a positive integer or 0. + */ + public static validateWeightedIterable(operation: IWeighted): void { + if (operation.weight < 0) { + throw new Error('Weight must be a whole number greater than or equal to 0'); + } + if (operation.weight % 1 !== 0) { + throw new Error('Weight must be a whole number greater than or equal to 0'); + } + } + /** * Returns a Signal, a.k.a. a "deferred promise". */ diff --git a/libraries/node-core-library/src/index.ts b/libraries/node-core-library/src/index.ts index 2a3be48e80..e7c7a31e7a 100644 --- a/libraries/node-core-library/src/index.ts +++ b/libraries/node-core-library/src/index.ts @@ -8,7 +8,7 @@ */ export { AlreadyReportedError } from './AlreadyReportedError'; -export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions } from './Async'; +export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions, IWeighted } from './Async'; export { Brand } from './PrimitiveTypes'; export { FileConstants, FolderConstants } from './Constants'; export { Enum } from './Enum'; diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index c2c27e1fb5..0060f33805 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -312,6 +312,188 @@ describe(Async.name, () => { Async.forEachAsync(syncIterable, async (item) => await Async.sleep(0)) ).rejects.toThrow(expectedError); }); + + interface INumberWithWeight { + n: number; + weight: number; + } + + it('handles an empty array correctly', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = []; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); + expect(fn).toHaveBeenCalledTimes(0); + expect(maxRunning).toEqual(0); + }); + + it('if concurrency is set, ensures no more than N operations occur in parallel', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ weight: 1, n })); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(3); + }); + + it('if concurrency is set but weighted is not, ensures no more than N operations occur in parallel and ignores operation weight', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ weight: 2, n })); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3 }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(3); + }); + + it.each([ + { + concurrency: 4, + weight: 4, + expectedConcurrency: 1 + }, + { + concurrency: 4, + weight: 1, + expectedConcurrency: 4 + }, + { + concurrency: 3, + weight: 1, + expectedConcurrency: 3 + }, + { + concurrency: 6, + weight: 2, + expectedConcurrency: 3 + }, + { + concurrency: 12, + weight: 3, + expectedConcurrency: 4 + } + ])( + 'if concurrency is set to $concurrency with operation weight $weight, ensures no more than $expectedConcurrency operations occur in parallel', + async ({ concurrency, weight, expectedConcurrency }) => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ n, weight })); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency, weighted: true }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(expectedConcurrency); + } + ); + + it('ensures that a large operation cannot be scheduled around', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 1 }, + { n: 3, weight: 1 }, + { n: 4, weight: 10 }, + { n: 5, weight: 1 }, + { n: 6, weight: 1 }, + { n: 7, weight: 5 }, + { n: 8, weight: 1 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(3); + }); + + it('waits for a large operation to finish before scheduling more', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 10 }, + { n: 3, weight: 1 }, + { n: 4, weight: 10 }, + { n: 5, weight: 1 }, + { n: 6, weight: 10 }, + { n: 7, weight: 1 }, + { n: 8, weight: 10 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(2); + }); + + it('allows operations with a weight of 0 and schedules them accordingly', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ n, weight: 0 })); + + array.unshift({ n: 9, weight: 3 }); + + array.push({ n: 10, weight: 3 }); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); + expect(fn).toHaveBeenCalledTimes(10); + expect(maxRunning).toEqual(9); + }); }); describe(Async.runWithRetriesAsync.name, () => { diff --git a/libraries/rush-lib/src/api/RushProjectConfiguration.ts b/libraries/rush-lib/src/api/RushProjectConfiguration.ts index a541f26755..988df6f91d 100644 --- a/libraries/rush-lib/src/api/RushProjectConfiguration.ts +++ b/libraries/rush-lib/src/api/RushProjectConfiguration.ts @@ -92,6 +92,12 @@ export interface IOperationSettings { * calculating final hash value when reading and writing the build cache */ dependsOnAdditionalFiles?: string[]; + + /** + * How many concurrency units this operation should take up during execution. The maximum concurrent units is + * determined by the -p flag. + */ + weight?: number; } interface IOldRushProjectJson { diff --git a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts index 37667eb7b9..64732beb78 100644 --- a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts +++ b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts @@ -48,6 +48,7 @@ import { RushProjectConfiguration } from '../../api/RushProjectConfiguration'; import { LegacySkipPlugin } from '../../logic/operations/LegacySkipPlugin'; import { ValidateOperationsPlugin } from '../../logic/operations/ValidateOperationsPlugin'; import type { ProjectWatcher } from '../../logic/ProjectWatcher'; +import { WeightedOperationPlugin } from '../../logic/operations/WeightedOperationPlugin'; /** * Constructor parameters for PhasedScriptAction. @@ -165,6 +166,8 @@ export class PhasedScriptAction extends BaseScriptAction { new PhasedOperationPlugin().apply(this.hooks); // Applies the Shell Operation Runner to selected operations new ShellOperationRunnerPlugin().apply(this.hooks); + + new WeightedOperationPlugin().apply(this.hooks); new ValidateOperationsPlugin(terminal).apply(this.hooks); if (this._enableParallelism) { diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index cb24ad0f63..4d0ad5b88c 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -15,7 +15,9 @@ import { RushConstants } from '../RushConstants'; */ export const UNASSIGNED_OPERATION: 'UNASSIGNED_OPERATION' = 'UNASSIGNED_OPERATION'; -export type IOperationIteratorResult = OperationExecutionRecord | typeof UNASSIGNED_OPERATION; +export type IOperationIteratorResult = + | OperationExecutionRecord + | { weight: 0; status: typeof UNASSIGNED_OPERATION }; /** * Implementation of the async iteration protocol for a collection of IOperation objects. @@ -164,7 +166,7 @@ export class AsyncOperationQueue // remote executing operation which is not ready to process. if (queue.some((operation) => operation.status === OperationStatus.RemoteExecuting)) { waitingIterators.shift()!({ - value: UNASSIGNED_OPERATION, + value: { weight: 0, status: UNASSIGNED_OPERATION }, done: false }); } diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index d981dcaa58..db86ca8e19 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -262,7 +262,7 @@ export class OperationExecutionManager { * This happens when some operations run remotely. So, we should try to get a remote executing operation * from the queue manually here. */ - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { // Pause for a few time await Async.sleep(5000); record = this._executionQueue.tryGetRemoteExecutingOperation(); @@ -281,7 +281,8 @@ export class OperationExecutionManager { } }, { - concurrency: maxParallelism + concurrency: maxParallelism, + weighted: true } ); diff --git a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts new file mode 100644 index 0000000000..c33e597154 --- /dev/null +++ b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. +// See LICENSE in the project root for license information. + +import type { Operation } from './Operation'; +import type { + ICreateOperationsContext, + IPhasedCommandPlugin, + PhasedCommandHooks +} from '../../pluginFramework/PhasedCommandHooks'; +import type { IOperationSettings, RushProjectConfiguration } from '../../api/RushProjectConfiguration'; +import type { IOperationExecutionResult } from './IOperationExecutionResult'; +import type { OperationExecutionRecord } from './OperationExecutionRecord'; +import { Async } from '@rushstack/node-core-library'; + +const PLUGIN_NAME: 'WeightedOperationPlugin' = 'WeightedOperationPlugin'; + +/** + * Add weights to operations based on the operation settings in rush-project.json. + * + * This also sets the weight of no-op operations to 0. + */ +export class WeightedOperationPlugin implements IPhasedCommandPlugin { + public apply(hooks: PhasedCommandHooks): void { + hooks.beforeExecuteOperations.tap(PLUGIN_NAME, weightOperations); + } +} + +function weightOperations( + operations: Map, + context: ICreateOperationsContext +): Map { + const { projectConfigurations } = context; + + for (const [operation, record] of operations) { + const { runner } = record as OperationExecutionRecord; + const { associatedProject: project, associatedPhase: phase } = operation; + if (runner!.isNoOp) { + operation.weight = 0; + } else if (project && phase) { + const projectConfiguration: RushProjectConfiguration | undefined = projectConfigurations.get(project); + const operationSettings: IOperationSettings | undefined = + projectConfiguration?.operationSettingsByOperationName.get(phase.name); + if (operationSettings?.weight) { + operation.weight = operationSettings.weight; + } + } + Async.validateWeightedIterable(operation); + } + return operations; +} diff --git a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts index 3afd3769d9..ebcdc71eb7 100644 --- a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts @@ -47,7 +47,7 @@ describe(AsyncOperationQueue.name, () => { const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, nullSort); for await (const operation of queue) { actualOrder.push(operation); - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { hasUnassignedOperation = true; continue; } @@ -76,7 +76,7 @@ describe(AsyncOperationQueue.name, () => { const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, customSort); for await (const operation of queue) { actualOrder.push(operation); - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { hasUnassignedOperation = true; continue; } @@ -135,7 +135,7 @@ describe(AsyncOperationQueue.name, () => { await Promise.all( Array.from({ length: 3 }, async () => { for await (const operation of queue) { - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { hasUnassignedOperation = true; continue; } @@ -184,7 +184,7 @@ describe(AsyncOperationQueue.name, () => { let remoteExecuted: boolean = false; for await (const operation of queue) { let record: OperationExecutionRecord | undefined; - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { await Async.sleep(100); record = queue.tryGetRemoteExecutingOperation(); } else { diff --git a/libraries/rush-lib/src/schemas/rush-project.schema.json b/libraries/rush-lib/src/schemas/rush-project.schema.json index d309a568e4..b6f171ddef 100644 --- a/libraries/rush-lib/src/schemas/rush-project.schema.json +++ b/libraries/rush-lib/src/schemas/rush-project.schema.json @@ -72,6 +72,12 @@ "disableBuildCacheForOperation": { "description": "Disable caching for this operation. The operation will never be restored from cache. This may be useful if this operation affects state outside of its folder.", "type": "boolean" + }, + + "weight": { + "description": "The number of concurrency units that this operation should take up. The maximum concurrency units is determined by the -p flag.", + "type": "integer", + "minimum": 0 } } }