From 919dff6d9db4bc73de7504a0cad72672393f0d27 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Mon, 29 Apr 2024 16:39:58 -0400 Subject: [PATCH 01/28] start working on weighted graph --- libraries/node-core-library/src/Async.ts | 53 ++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index c68f22240b..082bca7f23 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -29,6 +29,10 @@ export interface IRunWithRetriesOptions { retryDelayMs?: number; } +export interface WeightedOperation { + weight: number; +} + /** * Utilities for parallel asynchronous operations, for use with the system `Promise` APIs. * @@ -73,6 +77,55 @@ export class Async { return result; } + public static async forEachWeightedAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options?: IAsyncParallelismOptions | undefined + ): Promise { + const concurrency: number = + options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; + + const iterator: Iterator | AsyncIterator = ( + (iterable as Iterable)[Symbol.iterator] || + (iterable as AsyncIterable)[Symbol.asyncIterator] + ).call(iterable); + + let arrayIndex: number = 0; + let usedCapacity: number = 0; + + const pending: Set> = new Set(); + + // eslint-disable-next-line no-constant-condition + while (true) { + const currentIteratorResult: IteratorResult = await iterator.next(); + if (currentIteratorResult.done) { + break; + } + + const { value } = currentIteratorResult; + const weight: number = value.weight ?? 1; + if (weight < 0) { + throw new Error(`Invalid weight ${weight}. Weights must be greater than or equal to 0.`); + } + + usedCapacity += weight; + const promise: Promise = Promise.resolve(callback(value, arrayIndex++)).then(() => { + usedCapacity -= weight; + pending.delete(promise); + }); + pending.add(promise); + + // eslint-disable-next-line no-unmodified-loop-condition + while (usedCapacity >= concurrency && pending.size > 0) { + await Promise.race(Array.from(pending)); + } + } + + if (pending.size > 0) { + await Promise.all(Array.from(pending)); + } + } + /** * Given an input array and a `callback` function, invoke the callback to start a * promise for each element in the array. From ea5bae4073ceb4533a20b24ebec3e7f575c3aa08 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Tue, 30 Apr 2024 18:03:58 -0400 Subject: [PATCH 02/28] adds a new forEachWeightedAsync method to Async that uses operation weights to scale concurrent units --- common/reviews/api/node-core-library.api.md | 8 ++ common/reviews/api/rush-lib.api.md | 1 + libraries/node-core-library/src/Async.ts | 102 ++++++++++++------ libraries/node-core-library/src/index.ts | 8 +- .../node-core-library/src/test/Async.test.ts | 80 ++++++++++++++ .../src/api/RushProjectConfiguration.ts | 6 ++ .../cli/scriptActions/PhasedScriptAction.ts | 3 + .../logic/operations/AsyncOperationQueue.ts | 6 +- .../operations/OperationExecutionManager.ts | 4 +- .../operations/WeightedOperationPlugin.ts | 42 ++++++++ .../test/AsyncOperationQueue.test.ts | 8 +- .../src/schemas/rush-project.schema.json | 5 + 12 files changed, 229 insertions(+), 44 deletions(-) create mode 100644 libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index a94b4110eb..c480216737 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -26,6 +26,8 @@ export class AlreadyReportedError extends Error { // @public export class Async { static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; + // (undocumented) + static forEachWeightedAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; static getSignal(): [Promise, () => void, (err: Error) => void]; static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; static runWithRetriesAsync({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions): Promise; @@ -602,6 +604,12 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions { encoding: BufferEncoding; } +// @public (undocumented) +export interface IWeightedIterable { + // (undocumented) + weight: number; +} + // @public export class JsonFile { // @internal (undocumented) diff --git a/common/reviews/api/rush-lib.api.md b/common/reviews/api/rush-lib.api.md index 67cea3c845..4550e846e0 100644 --- a/common/reviews/api/rush-lib.api.md +++ b/common/reviews/api/rush-lib.api.md @@ -619,6 +619,7 @@ export interface IOperationSettings { disableBuildCacheForOperation?: boolean; operationName: string; outputFolderNames?: string[]; + weight?: number; } // @internal (undocumented) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 082bca7f23..c0930961b9 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -29,7 +29,13 @@ export interface IRunWithRetriesOptions { retryDelayMs?: number; } -export interface WeightedOperation { +/** + * @remarks + * Used with {@link Async.forEachWeightedAsync}. + * + * @public + */ +export interface IWeightedIterable { weight: number; } @@ -77,53 +83,79 @@ export class Async { return result; } - public static async forEachWeightedAsync( + public static async forEachWeightedAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined ): Promise { - const concurrency: number = - options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; + await new Promise((resolve: () => void, reject: (error: Error) => void) => { + const concurrency: number = + options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; + let concurrentUnitsInProgress: number = 0; - const iterator: Iterator | AsyncIterator = ( - (iterable as Iterable)[Symbol.iterator] || - (iterable as AsyncIterable)[Symbol.asyncIterator] - ).call(iterable); + const iterator: Iterator | AsyncIterator = ( + (iterable as Iterable)[Symbol.iterator] || + (iterable as AsyncIterable)[Symbol.asyncIterator] + ).call(iterable); - let arrayIndex: number = 0; - let usedCapacity: number = 0; + let arrayIndex: number = 0; + let iteratorIsComplete: boolean = false; + let promiseHasResolvedOrRejected: boolean = false; - const pending: Set> = new Set(); + async function queueOperationsAsync(): Promise { + while ( + concurrentUnitsInProgress < concurrency && + !iteratorIsComplete && + !promiseHasResolvedOrRejected + ) { + // Increment the concurrency while waiting for the iterator. + // This function is reentrant, so this ensures that at most `concurrency` executions are waiting + concurrentUnitsInProgress++; + const currentIteratorResult: IteratorResult = await iterator.next(); + // eslint-disable-next-line require-atomic-updates + iteratorIsComplete = !!currentIteratorResult.done; - // eslint-disable-next-line no-constant-condition - while (true) { - const currentIteratorResult: IteratorResult = await iterator.next(); - if (currentIteratorResult.done) { - break; - } + if (!iteratorIsComplete) { + // Typescript refuses to narrow the typing here even though gets narrowed with `!currentIteratorResult.done` in the if. + const weight: number = (currentIteratorResult.value as TEntry).weight; + // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. + concurrentUnitsInProgress += weight - 1; + Promise.resolve(callback(currentIteratorResult.value, arrayIndex++)) + .then(async () => { + concurrentUnitsInProgress -= weight; + await onOperationCompletionAsync(); + }) + .catch((error) => { + promiseHasResolvedOrRejected = true; + reject(error); + }); + } else { + // The iterator is complete and there wasn't a value, so untrack the waiting state. + concurrentUnitsInProgress--; + } + } - const { value } = currentIteratorResult; - const weight: number = value.weight ?? 1; - if (weight < 0) { - throw new Error(`Invalid weight ${weight}. Weights must be greater than or equal to 0.`); + if (iteratorIsComplete) { + await onOperationCompletionAsync(); + } } - usedCapacity += weight; - const promise: Promise = Promise.resolve(callback(value, arrayIndex++)).then(() => { - usedCapacity -= weight; - pending.delete(promise); - }); - pending.add(promise); - - // eslint-disable-next-line no-unmodified-loop-condition - while (usedCapacity >= concurrency && pending.size > 0) { - await Promise.race(Array.from(pending)); + async function onOperationCompletionAsync(): Promise { + if (!promiseHasResolvedOrRejected) { + if (concurrentUnitsInProgress === 0 && iteratorIsComplete) { + promiseHasResolvedOrRejected = true; + resolve(); + } else if (!iteratorIsComplete) { + await queueOperationsAsync(); + } + } } - } - if (pending.size > 0) { - await Promise.all(Array.from(pending)); - } + queueOperationsAsync().catch((error) => { + promiseHasResolvedOrRejected = true; + reject(error); + }); + }); } /** diff --git a/libraries/node-core-library/src/index.ts b/libraries/node-core-library/src/index.ts index 2a3be48e80..ad57ffe89f 100644 --- a/libraries/node-core-library/src/index.ts +++ b/libraries/node-core-library/src/index.ts @@ -8,7 +8,13 @@ */ export { AlreadyReportedError } from './AlreadyReportedError'; -export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions } from './Async'; +export { + Async, + AsyncQueue, + IAsyncParallelismOptions, + IRunWithRetriesOptions, + IWeightedIterable +} from './Async'; export { Brand } from './PrimitiveTypes'; export { FileConstants, FolderConstants } from './Constants'; export { Enum } from './Enum'; diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index c2c27e1fb5..cbc8dfd625 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -314,6 +314,86 @@ describe(Async.name, () => { }); }); + describe(Async.forEachWeightedAsync.name, () => { + interface INumberWithWeight { + n: number; + weight: number; + } + + it('handles an empty array correctly', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = []; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + expect(fn).toHaveBeenCalledTimes(0); + expect(maxRunning).toEqual(0); + }); + + it('if concurrency is set, ensures no more than N operations occur in parallel', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ weight: 1, n })); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(3); + }); + + it.each([ + { + concurrency: 2, + weight: 0.5, + expectedConcurrency: 4 + }, + { + concurrency: 1, + weight: 0.4, + expectedConcurrency: 3 + }, + { + concurrency: 1, + weight: 0.17, + expectedConcurrency: 6 + } + ])( + 'if concurrency is set to $concurrency with operation weight $weight, ensures no more than $expectedConcurrency operations occur in parallel', + async ({ concurrency, weight, expectedConcurrency }) => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ n, weight })); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachWeightedAsync(array, fn, { concurrency }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(expectedConcurrency); + } + ); + }); + describe(Async.runWithRetriesAsync.name, () => { it('Correctly handles a sync function that succeeds the first time', async () => { const expectedResult: string = 'RESULT'; diff --git a/libraries/rush-lib/src/api/RushProjectConfiguration.ts b/libraries/rush-lib/src/api/RushProjectConfiguration.ts index a541f26755..988df6f91d 100644 --- a/libraries/rush-lib/src/api/RushProjectConfiguration.ts +++ b/libraries/rush-lib/src/api/RushProjectConfiguration.ts @@ -92,6 +92,12 @@ export interface IOperationSettings { * calculating final hash value when reading and writing the build cache */ dependsOnAdditionalFiles?: string[]; + + /** + * How many concurrency units this operation should take up during execution. The maximum concurrent units is + * determined by the -p flag. + */ + weight?: number; } interface IOldRushProjectJson { diff --git a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts index 37667eb7b9..64732beb78 100644 --- a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts +++ b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts @@ -48,6 +48,7 @@ import { RushProjectConfiguration } from '../../api/RushProjectConfiguration'; import { LegacySkipPlugin } from '../../logic/operations/LegacySkipPlugin'; import { ValidateOperationsPlugin } from '../../logic/operations/ValidateOperationsPlugin'; import type { ProjectWatcher } from '../../logic/ProjectWatcher'; +import { WeightedOperationPlugin } from '../../logic/operations/WeightedOperationPlugin'; /** * Constructor parameters for PhasedScriptAction. @@ -165,6 +166,8 @@ export class PhasedScriptAction extends BaseScriptAction { new PhasedOperationPlugin().apply(this.hooks); // Applies the Shell Operation Runner to selected operations new ShellOperationRunnerPlugin().apply(this.hooks); + + new WeightedOperationPlugin().apply(this.hooks); new ValidateOperationsPlugin(terminal).apply(this.hooks); if (this._enableParallelism) { diff --git a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts index cb24ad0f63..4d0ad5b88c 100644 --- a/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts +++ b/libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts @@ -15,7 +15,9 @@ import { RushConstants } from '../RushConstants'; */ export const UNASSIGNED_OPERATION: 'UNASSIGNED_OPERATION' = 'UNASSIGNED_OPERATION'; -export type IOperationIteratorResult = OperationExecutionRecord | typeof UNASSIGNED_OPERATION; +export type IOperationIteratorResult = + | OperationExecutionRecord + | { weight: 0; status: typeof UNASSIGNED_OPERATION }; /** * Implementation of the async iteration protocol for a collection of IOperation objects. @@ -164,7 +166,7 @@ export class AsyncOperationQueue // remote executing operation which is not ready to process. if (queue.some((operation) => operation.status === OperationStatus.RemoteExecuting)) { waitingIterators.shift()!({ - value: UNASSIGNED_OPERATION, + value: { weight: 0, status: UNASSIGNED_OPERATION }, done: false }); } diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index d981dcaa58..554848d7b0 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -253,7 +253,7 @@ export class OperationExecutionManager { return await this._beforeExecuteOperation?.(record); }; - await Async.forEachAsync( + await Async.forEachWeightedAsync( this._executionQueue, async (operation: IOperationIteratorResult) => { let record: OperationExecutionRecord | undefined; @@ -262,7 +262,7 @@ export class OperationExecutionManager { * This happens when some operations run remotely. So, we should try to get a remote executing operation * from the queue manually here. */ - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { // Pause for a few time await Async.sleep(5000); record = this._executionQueue.tryGetRemoteExecutingOperation(); diff --git a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts new file mode 100644 index 0000000000..271cb23bd4 --- /dev/null +++ b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. +// See LICENSE in the project root for license information. + +import type { Operation } from './Operation'; +import type { + ICreateOperationsContext, + IPhasedCommandPlugin, + PhasedCommandHooks +} from '../../pluginFramework/PhasedCommandHooks'; +import type { IOperationSettings, RushProjectConfiguration } from '../../api/RushProjectConfiguration'; + +const PLUGIN_NAME: 'WeightedOperationPlugin' = 'WeightedOperationPlugin'; + +/** + * Add weights to operations based on the operation settings in rush-project.json. + * + * This also sets the weight of no-op operations to 0.01. + */ +export class WeightedOperationPlugin implements IPhasedCommandPlugin { + public apply(hooks: PhasedCommandHooks): void { + hooks.createOperations.tap(PLUGIN_NAME, weightOperations); + } +} + +function weightOperations(operations: Set, context: ICreateOperationsContext): Set { + const { projectConfigurations } = context; + + for (const operation of operations) { + const { associatedProject: project, associatedPhase: phase } = operation; + if (operation.runner?.isNoOp) { + operation.weight = 0.01; + } else if (project && phase) { + const projectConfiguration: RushProjectConfiguration | undefined = projectConfigurations.get(project); + const operationSettings: IOperationSettings | undefined = + projectConfiguration?.operationSettingsByOperationName.get(phase.name); + if (operationSettings?.weight) { + operation.weight = operationSettings.weight; + } + } + } + return operations; +} diff --git a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts index 3afd3769d9..ebcdc71eb7 100644 --- a/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts +++ b/libraries/rush-lib/src/logic/operations/test/AsyncOperationQueue.test.ts @@ -47,7 +47,7 @@ describe(AsyncOperationQueue.name, () => { const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, nullSort); for await (const operation of queue) { actualOrder.push(operation); - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { hasUnassignedOperation = true; continue; } @@ -76,7 +76,7 @@ describe(AsyncOperationQueue.name, () => { const queue: AsyncOperationQueue = new AsyncOperationQueue(operations, customSort); for await (const operation of queue) { actualOrder.push(operation); - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { hasUnassignedOperation = true; continue; } @@ -135,7 +135,7 @@ describe(AsyncOperationQueue.name, () => { await Promise.all( Array.from({ length: 3 }, async () => { for await (const operation of queue) { - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { hasUnassignedOperation = true; continue; } @@ -184,7 +184,7 @@ describe(AsyncOperationQueue.name, () => { let remoteExecuted: boolean = false; for await (const operation of queue) { let record: OperationExecutionRecord | undefined; - if (operation === UNASSIGNED_OPERATION) { + if (operation.status === UNASSIGNED_OPERATION) { await Async.sleep(100); record = queue.tryGetRemoteExecutingOperation(); } else { diff --git a/libraries/rush-lib/src/schemas/rush-project.schema.json b/libraries/rush-lib/src/schemas/rush-project.schema.json index d309a568e4..3be3dc3416 100644 --- a/libraries/rush-lib/src/schemas/rush-project.schema.json +++ b/libraries/rush-lib/src/schemas/rush-project.schema.json @@ -72,6 +72,11 @@ "disableBuildCacheForOperation": { "description": "Disable caching for this operation. The operation will never be restored from cache. This may be useful if this operation affects state outside of its folder.", "type": "boolean" + }, + + "weight": { + "description": "The number of concurrency units that this operation should take up. The maximum concurrency units is determined by the -p flag.", + "type": "number" } } } From f16e6a97e2303767e0e799dc8e223f1158f6e925 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 18:12:06 -0400 Subject: [PATCH 03/28] add test cases for async weighted --- .../node-core-library/src/test/Async.test.ts | 78 +++++++++++++++++-- 1 file changed, 71 insertions(+), 7 deletions(-) diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index cbc8dfd625..57427996c9 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -358,19 +358,29 @@ describe(Async.name, () => { it.each([ { - concurrency: 2, - weight: 0.5, + concurrency: 4, + weight: 4, + expectedConcurrency: 1 + }, + { + concurrency: 4, + weight: 1, expectedConcurrency: 4 }, { - concurrency: 1, - weight: 0.4, + concurrency: 3, + weight: 1, + expectedConcurrency: 3 + }, + { + concurrency: 6, + weight: 2, expectedConcurrency: 3 }, { - concurrency: 1, - weight: 0.17, - expectedConcurrency: 6 + concurrency: 12, + weight: 3, + expectedConcurrency: 4 } ])( 'if concurrency is set to $concurrency with operation weight $weight, ensures no more than $expectedConcurrency operations occur in parallel', @@ -392,6 +402,60 @@ describe(Async.name, () => { expect(maxRunning).toEqual(expectedConcurrency); } ); + + it('ensures that a large operation cannot be scheduled around', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 1 }, + { n: 3, weight: 1 }, + { n: 4, weight: 10 }, + { n: 5, weight: 1 }, + { n: 6, weight: 1 }, + { n: 7, weight: 5 }, + { n: 8, weight: 1 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(3); + }); + + it('waits for a large operation to finish before scheduling more', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [ + { n: 1, weight: 1 }, + { n: 2, weight: 10 }, + { n: 3, weight: 1 }, + { n: 4, weight: 10 }, + { n: 5, weight: 1 }, + { n: 6, weight: 10 }, + { n: 7, weight: 1 }, + { n: 8, weight: 10 } + ]; + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(2); + }); }); describe(Async.runWithRetriesAsync.name, () => { From cc5b1df4951b554d5b9484048f1c589540c32f23 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 18:13:55 -0400 Subject: [PATCH 04/28] add a test case for weight 0 --- .../node-core-library/src/test/Async.test.ts | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 57427996c9..e7c02640b7 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -456,6 +456,28 @@ describe(Async.name, () => { expect(fn).toHaveBeenCalledTimes(8); expect(maxRunning).toEqual(2); }); + + it('allows operations with a weight of 0 and schedules them accordingly', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ n, weight: 0 })); + + array.unshift({ n: 9, weight: 3 }); + + array.push({ n: 10, weight: 3 }); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + expect(fn).toHaveBeenCalledTimes(10); + expect(maxRunning).toEqual(9); + }); }); describe(Async.runWithRetriesAsync.name, () => { From 8ed8fb87f19f87fb4a932f55f38784728c6ecdbf Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 18:32:36 -0400 Subject: [PATCH 05/28] add a header comment --- libraries/node-core-library/src/Async.ts | 27 +++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index c0930961b9..d52601fecd 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -83,6 +83,31 @@ export class Async { return result; } + /** + * Given an input array and a `callback` function, invoke the callback to start a + * promise for each element in the array. + * + * @remarks + * This API is similar to the system `Array#forEachAsync`, except that each item can have + * a weight that determines how many concurrent operations are allowed. `Array#forEachAsync` + * is a special case of this method where weight = 1 for all items. + * + * The maximum number of concurrent operations can still be throttled using + * {@link IAsyncParallelismOptions.concurrency}, however it no longer determines the + * maximum number of operations that can be in progress at once. Instead, it determines the + * number of concurrency units that can be in progress at once. The weight of each operation + * determines how many concurrency units it takes up. For example, if the concurrency is 2 + * and the first operation has a weight of 2, then only one more operation can be in progress. + * + * If `callback` throws a synchronous exception, or if it returns a promise that rejects, + * then the loop stops immediately. Any remaining array items will be skipped, and + * overall operation will reject with the first error that was encountered. + * + * @param iterable - the array of inputs for the callback function + * @param callback - a function that starts an asynchronous promise for an element + * from the array + * @param options - options for customizing the control flow + */ public static async forEachWeightedAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, @@ -117,7 +142,7 @@ export class Async { if (!iteratorIsComplete) { // Typescript refuses to narrow the typing here even though gets narrowed with `!currentIteratorResult.done` in the if. - const weight: number = (currentIteratorResult.value as TEntry).weight; + const weight: number = (currentIteratorResult.value as TEntry).weight ?? 1; // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. concurrentUnitsInProgress += weight - 1; Promise.resolve(callback(currentIteratorResult.value, arrayIndex++)) From dbc8402b38a0e3dc1d6be8c0d48531dc0849493c Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 18:33:15 -0400 Subject: [PATCH 06/28] fix-api-report --- common/reviews/api/node-core-library.api.md | 1 - 1 file changed, 1 deletion(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index c480216737..b8e0cb3586 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -26,7 +26,6 @@ export class AlreadyReportedError extends Error { // @public export class Async { static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; - // (undocumented) static forEachWeightedAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; static getSignal(): [Promise, () => void, (err: Error) => void]; static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; From 0a1ebc59717453d78ada0510ca026f178ef67329 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 18:37:16 -0400 Subject: [PATCH 07/28] fix weightedoperationplugin --- .../operations/WeightedOperationPlugin.ts | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts index 271cb23bd4..85b83a2335 100644 --- a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts @@ -8,27 +8,33 @@ import type { PhasedCommandHooks } from '../../pluginFramework/PhasedCommandHooks'; import type { IOperationSettings, RushProjectConfiguration } from '../../api/RushProjectConfiguration'; +import { IOperationExecutionResult } from './IOperationExecutionResult'; +import { OperationExecutionRecord } from './OperationExecutionRecord'; const PLUGIN_NAME: 'WeightedOperationPlugin' = 'WeightedOperationPlugin'; /** * Add weights to operations based on the operation settings in rush-project.json. * - * This also sets the weight of no-op operations to 0.01. + * This also sets the weight of no-op operations to 0. */ export class WeightedOperationPlugin implements IPhasedCommandPlugin { public apply(hooks: PhasedCommandHooks): void { - hooks.createOperations.tap(PLUGIN_NAME, weightOperations); + hooks.beforeExecuteOperations.tap(PLUGIN_NAME, weightOperations); } } -function weightOperations(operations: Set, context: ICreateOperationsContext): Set { +function weightOperations( + operations: Map, + context: ICreateOperationsContext +): Map { const { projectConfigurations } = context; - for (const operation of operations) { + for (const [operation, record] of operations) { + const { runner } = record as OperationExecutionRecord; const { associatedProject: project, associatedPhase: phase } = operation; - if (operation.runner?.isNoOp) { - operation.weight = 0.01; + if (runner!.isNoOp) { + operation.weight = 0; } else if (project && phase) { const projectConfiguration: RushProjectConfiguration | undefined = projectConfigurations.get(project); const operationSettings: IOperationSettings | undefined = @@ -37,6 +43,11 @@ function weightOperations(operations: Set, context: ICreateOperations operation.weight = operationSettings.weight; } } + if (operation.weight < 0) { + throw new Error(`The weight of the operation '${operation.name}' cannot be negative.`); + } else if (operation.weight % 1 !== 0) { + throw new Error(`The weight of the operation '${operation.name}' cannot be a decimal.`); + } } return operations; } From fed6a2395375ae7b871f58897d755a3a896a8671 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 18:51:49 -0400 Subject: [PATCH 08/28] add changesets --- .../rush/sennyeya-weighted-graph_2024-05-01-22-51.json | 10 ++++++++++ .../sennyeya-weighted-graph_2024-05-01-22-51.json | 10 ++++++++++ 2 files changed, 20 insertions(+) create mode 100644 common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json create mode 100644 common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json diff --git a/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json b/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json new file mode 100644 index 0000000000..3067736d0e --- /dev/null +++ b/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@microsoft/rush", + "comment": "Adds weighted parallelism to operations. Operations can now define an integer weight for how much parallelism they want to take up.", + "type": "none" + } + ], + "packageName": "@microsoft/rush" +} \ No newline at end of file diff --git a/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json new file mode 100644 index 0000000000..4f95666bbe --- /dev/null +++ b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@rushstack/node-core-library", + "comment": "A new `Async.forEachWeightedAsync` method allows weighted parallelism where operations can have more or less than a single parallelism unit.", + "type": "minor" + } + ], + "packageName": "@rushstack/node-core-library" +} \ No newline at end of file From f935016e7b4859f0f7525caa122dda143db40952 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 20:11:53 -0400 Subject: [PATCH 09/28] fix linting --- .../rush-lib/src/logic/operations/WeightedOperationPlugin.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts index 85b83a2335..d502d674a7 100644 --- a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts @@ -8,8 +8,8 @@ import type { PhasedCommandHooks } from '../../pluginFramework/PhasedCommandHooks'; import type { IOperationSettings, RushProjectConfiguration } from '../../api/RushProjectConfiguration'; -import { IOperationExecutionResult } from './IOperationExecutionResult'; -import { OperationExecutionRecord } from './OperationExecutionRecord'; +import type { IOperationExecutionResult } from './IOperationExecutionResult'; +import type { OperationExecutionRecord } from './OperationExecutionRecord'; const PLUGIN_NAME: 'WeightedOperationPlugin' = 'WeightedOperationPlugin'; From 06c7679f01b9f9d6267bd9d580f9c401c62ddbb5 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 22:34:25 -0400 Subject: [PATCH 10/28] move the weighting behavior into an overload --- common/reviews/api/node-core-library.api.md | 9 +- libraries/node-core-library/src/Async.ts | 128 ++++++++---------- .../node-core-library/src/test/Async.test.ts | 14 +- .../operations/OperationExecutionManager.ts | 5 +- 4 files changed, 73 insertions(+), 83 deletions(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index b8e0cb3586..568b8fbdc7 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -25,8 +25,11 @@ export class AlreadyReportedError extends Error { // @public export class Async { - static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; - static forEachWeightedAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; + static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions): Promise; + // (undocumented) + static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: IAsyncParallelismOptions & { + weighted: true; + }): Promise; static getSignal(): [Promise, () => void, (err: Error) => void]; static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; static runWithRetriesAsync({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions): Promise; @@ -606,7 +609,7 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions { // @public (undocumented) export interface IWeightedIterable { // (undocumented) - weight: number; + weight?: number; } // @public diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index d52601fecd..fc40ad90c3 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -5,14 +5,14 @@ * Options for controlling the parallelism of asynchronous operations. * * @remarks - * Used with {@link Async.mapAsync} and {@link Async.forEachAsync}. + * Used with {@link Async.mapAsync} and {@link (Async:class).(forEachAsync:2)}. * * @public */ export interface IAsyncParallelismOptions { /** - * Optionally used with the {@link Async.mapAsync} and {@link Async.forEachAsync} - * to limit the maximum number of concurrent promises to the specified number. + * Optionally used with the {@link Async.mapAsync}, {@link (Async:class).(forEachAsync:1)}, and + * {@link (Async:class).(forEachAsync:2)} to limit the maximum number of concurrent promises to the specified number. */ concurrency?: number; } @@ -31,12 +31,39 @@ export interface IRunWithRetriesOptions { /** * @remarks - * Used with {@link Async.forEachWeightedAsync}. + * Used with {@link (Async:class).(forEachAsync:2)}. * * @public */ export interface IWeightedIterable { - weight: number; + weight?: number; +} + +function getWeight(element: T): number | undefined { + if (typeof element === 'object' && element !== null) { + return 'weight' in element ? (element.weight as number) : undefined; + } + return undefined; +} + +function toWeightedIterator( + iterable: Iterable | AsyncIterable +): AsyncIterable<{ element: TEntry; weighted?: number }> { + const iterator: Iterator | AsyncIterator = ( + (iterable as Iterable)[Symbol.iterator] || + (iterable as AsyncIterable)[Symbol.asyncIterator] + ).call(iterable); + return { + [Symbol.asyncIterator]: () => ({ + next: async () => { + const { value, done } = await Promise.resolve(iterator.next()); + return { + value: { element: value, weight: getWeight(value) }, + done: done ?? false + }; + } + }) + }; } /** @@ -108,9 +135,9 @@ export class Async { * from the array * @param options - options for customizing the control flow */ - public static async forEachWeightedAsync( + private static async _forEachWeightedAsync( iterable: Iterable | AsyncIterable, - callback: (entry: TEntry, arrayIndex: number) => Promise, + callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined ): Promise { await new Promise((resolve: () => void, reject: (error: Error) => void) => { @@ -141,11 +168,11 @@ export class Async { iteratorIsComplete = !!currentIteratorResult.done; if (!iteratorIsComplete) { - // Typescript refuses to narrow the typing here even though gets narrowed with `!currentIteratorResult.done` in the if. - const weight: number = (currentIteratorResult.value as TEntry).weight ?? 1; + const currentIteratorValue: TEntry = currentIteratorResult.value; + const weight: number = currentIteratorValue.weight ?? 1; // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. concurrentUnitsInProgress += weight - 1; - Promise.resolve(callback(currentIteratorResult.value, arrayIndex++)) + Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) .then(async () => { concurrentUnitsInProgress -= weight; await onOperationCompletionAsync(); @@ -200,72 +227,33 @@ export class Async { * @param callback - a function that starts an asynchronous promise for an element * from the array * @param options - options for customizing the control flow + * + * {@label UNWEIGHTED} */ public static async forEachAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, - options?: IAsyncParallelismOptions | undefined - ): Promise { - await new Promise((resolve: () => void, reject: (error: Error) => void) => { - const concurrency: number = - options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity; - let operationsInProgress: number = 0; - - const iterator: Iterator | AsyncIterator = ( - (iterable as Iterable)[Symbol.iterator] || - (iterable as AsyncIterable)[Symbol.asyncIterator] - ).call(iterable); - - let arrayIndex: number = 0; - let iteratorIsComplete: boolean = false; - let promiseHasResolvedOrRejected: boolean = false; - - async function queueOperationsAsync(): Promise { - while (operationsInProgress < concurrency && !iteratorIsComplete && !promiseHasResolvedOrRejected) { - // Increment the concurrency while waiting for the iterator. - // This function is reentrant, so this ensures that at most `concurrency` executions are waiting - operationsInProgress++; - const currentIteratorResult: IteratorResult = await iterator.next(); - // eslint-disable-next-line require-atomic-updates - iteratorIsComplete = !!currentIteratorResult.done; - - if (!iteratorIsComplete) { - Promise.resolve(callback(currentIteratorResult.value, arrayIndex++)) - .then(async () => { - operationsInProgress--; - await onOperationCompletionAsync(); - }) - .catch((error) => { - promiseHasResolvedOrRejected = true; - reject(error); - }); - } else { - // The iterator is complete and there wasn't a value, so untrack the waiting state. - operationsInProgress--; - } - } - - if (iteratorIsComplete) { - await onOperationCompletionAsync(); - } - } + options?: IAsyncParallelismOptions + ): Promise; - async function onOperationCompletionAsync(): Promise { - if (!promiseHasResolvedOrRejected) { - if (operationsInProgress === 0 && iteratorIsComplete) { - promiseHasResolvedOrRejected = true; - resolve(); - } else if (!iteratorIsComplete) { - await queueOperationsAsync(); - } - } - } + /** + * {@label WEIGHTED} + */ + public static async forEachAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options: IAsyncParallelismOptions & { weighted: true } + ): Promise; + public static async forEachAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options?: IAsyncParallelismOptions & { weighted?: true } + ): Promise { + if (options?.weighted) { + return Async._forEachWeightedAsync(toWeightedIterator(iterable), callback, options); + } - queueOperationsAsync().catch((error) => { - promiseHasResolvedOrRejected = true; - reject(error); - }); - }); + return Async._forEachWeightedAsync(toWeightedIterator(iterable), callback, options); } /** diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index e7c02640b7..70825f5cd2 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -312,9 +312,7 @@ describe(Async.name, () => { Async.forEachAsync(syncIterable, async (item) => await Async.sleep(0)) ).rejects.toThrow(expectedError); }); - }); - describe(Async.forEachWeightedAsync.name, () => { interface INumberWithWeight { n: number; weight: number; @@ -333,7 +331,7 @@ describe(Async.name, () => { running--; }); - await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); expect(fn).toHaveBeenCalledTimes(0); expect(maxRunning).toEqual(0); }); @@ -351,7 +349,7 @@ describe(Async.name, () => { running--; }); - await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); expect(fn).toHaveBeenCalledTimes(8); expect(maxRunning).toEqual(3); }); @@ -397,7 +395,7 @@ describe(Async.name, () => { running--; }); - await Async.forEachWeightedAsync(array, fn, { concurrency }); + await Async.forEachAsync(array, fn, { concurrency, weighted: true }); expect(fn).toHaveBeenCalledTimes(8); expect(maxRunning).toEqual(expectedConcurrency); } @@ -425,7 +423,7 @@ describe(Async.name, () => { running--; }); - await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); expect(fn).toHaveBeenCalledTimes(8); expect(maxRunning).toEqual(3); }); @@ -452,7 +450,7 @@ describe(Async.name, () => { running--; }); - await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); expect(fn).toHaveBeenCalledTimes(8); expect(maxRunning).toEqual(2); }); @@ -474,7 +472,7 @@ describe(Async.name, () => { running--; }); - await Async.forEachWeightedAsync(array, fn, { concurrency: 3 }); + await Async.forEachAsync(array, fn, { concurrency: 3, weighted: true }); expect(fn).toHaveBeenCalledTimes(10); expect(maxRunning).toEqual(9); }); diff --git a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts index 554848d7b0..db86ca8e19 100644 --- a/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts +++ b/libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts @@ -253,7 +253,7 @@ export class OperationExecutionManager { return await this._beforeExecuteOperation?.(record); }; - await Async.forEachWeightedAsync( + await Async.forEachAsync( this._executionQueue, async (operation: IOperationIteratorResult) => { let record: OperationExecutionRecord | undefined; @@ -281,7 +281,8 @@ export class OperationExecutionManager { } }, { - concurrency: maxParallelism + concurrency: maxParallelism, + weighted: true } ); From 32ad704d761be8cc17e44c2b31afcfe747617d0f Mon Sep 17 00:00:00 2001 From: Aramis Sennyey <159921952+aramissennyeydd@users.noreply.github.com> Date: Wed, 1 May 2024 22:35:18 -0400 Subject: [PATCH 11/28] Apply suggestions from code review Co-authored-by: Ian Clanton-Thuon --- .../rush/sennyeya-weighted-graph_2024-05-01-22-51.json | 2 +- .../sennyeya-weighted-graph_2024-05-01-22-51.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json b/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json index 3067736d0e..f2a6516155 100644 --- a/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json +++ b/common/changes/@microsoft/rush/sennyeya-weighted-graph_2024-05-01-22-51.json @@ -2,7 +2,7 @@ "changes": [ { "packageName": "@microsoft/rush", - "comment": "Adds weighted parallelism to operations. Operations can now define an integer weight for how much parallelism they want to take up.", + "comment": "Add a `\"weight\"` property to the `\"operation\"` object in the project `config/rush-project.json` file that defines an integer weight for how much of the allowed parallelism the operation uses.", "type": "none" } ], diff --git a/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json index 4f95666bbe..bff2e9c550 100644 --- a/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json +++ b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json @@ -2,7 +2,7 @@ "changes": [ { "packageName": "@rushstack/node-core-library", - "comment": "A new `Async.forEachWeightedAsync` method allows weighted parallelism where operations can have more or less than a single parallelism unit.", + "comment": "Add a new `Async.forEachWeightedAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.", "type": "minor" } ], From 1ac2b95ff957481dfc8f85606bf2c739065c7652 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 22:36:42 -0400 Subject: [PATCH 12/28] update changeset --- .../sennyeya-weighted-graph_2024-05-01-22-51.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json index bff2e9c550..c0dd13175b 100644 --- a/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json +++ b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-01-22-51.json @@ -2,9 +2,9 @@ "changes": [ { "packageName": "@rushstack/node-core-library", - "comment": "Add a new `Async.forEachWeightedAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.", + "comment": "Add a new `weighted: true` option to the `Async.forEachAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.", "type": "minor" } ], "packageName": "@rushstack/node-core-library" -} \ No newline at end of file +} From 75701f75acd40d2675a3c533fe8b3eec9da7fc00 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 22:41:14 -0400 Subject: [PATCH 13/28] remove unnecessary tsdoc things --- libraries/node-core-library/src/Async.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index fc40ad90c3..191aa8f7d8 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -227,18 +227,12 @@ export class Async { * @param callback - a function that starts an asynchronous promise for an element * from the array * @param options - options for customizing the control flow - * - * {@label UNWEIGHTED} */ public static async forEachAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions ): Promise; - - /** - * {@label WEIGHTED} - */ public static async forEachAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, From 56a70a630198e2b57a19bc90dbb43266940718bf Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 22:48:57 -0400 Subject: [PATCH 14/28] make weight required --- libraries/node-core-library/src/Async.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 191aa8f7d8..5a4dd03bbf 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -36,7 +36,7 @@ export interface IRunWithRetriesOptions { * @public */ export interface IWeightedIterable { - weight?: number; + weight: number; } function getWeight(element: T): number | undefined { From a04fe82b2d39e54048462a2465801d1261b5b4b7 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 22:49:44 -0400 Subject: [PATCH 15/28] fix api report --- common/reviews/api/node-core-library.api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index 568b8fbdc7..3253ff665e 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -609,7 +609,7 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions { // @public (undocumented) export interface IWeightedIterable { // (undocumented) - weight?: number; + weight: number; } // @public From 86ae47a41758566dc7219a4ea367feeba87c913e Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 22:52:32 -0400 Subject: [PATCH 16/28] only use weights when weighted is set to true --- libraries/node-core-library/src/Async.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 5a4dd03bbf..e21c73b40a 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -47,7 +47,8 @@ function getWeight(element: T): number | undefined { } function toWeightedIterator( - iterable: Iterable | AsyncIterable + iterable: Iterable | AsyncIterable, + useWeights: boolean = false ): AsyncIterable<{ element: TEntry; weighted?: number }> { const iterator: Iterator | AsyncIterator = ( (iterable as Iterable)[Symbol.iterator] || @@ -58,8 +59,8 @@ function toWeightedIterator( next: async () => { const { value, done } = await Promise.resolve(iterator.next()); return { - value: { element: value, weight: getWeight(value) }, - done: done ?? false + value: { element: value, weight: useWeights ? getWeight(value) : 1 }, + done }; } }) @@ -244,7 +245,7 @@ export class Async { options?: IAsyncParallelismOptions & { weighted?: true } ): Promise { if (options?.weighted) { - return Async._forEachWeightedAsync(toWeightedIterator(iterable), callback, options); + return Async._forEachWeightedAsync(toWeightedIterator(iterable, true), callback, options); } return Async._forEachWeightedAsync(toWeightedIterator(iterable), callback, options); From 63857ef0333b93e4c33db412ca3cc87ff4a9527c Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Wed, 1 May 2024 22:56:18 -0400 Subject: [PATCH 17/28] add a test for weighted being disabled --- .../node-core-library/src/test/Async.test.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/libraries/node-core-library/src/test/Async.test.ts b/libraries/node-core-library/src/test/Async.test.ts index 70825f5cd2..0060f33805 100644 --- a/libraries/node-core-library/src/test/Async.test.ts +++ b/libraries/node-core-library/src/test/Async.test.ts @@ -354,6 +354,24 @@ describe(Async.name, () => { expect(maxRunning).toEqual(3); }); + it('if concurrency is set but weighted is not, ensures no more than N operations occur in parallel and ignores operation weight', async () => { + let running: number = 0; + let maxRunning: number = 0; + + const array: INumberWithWeight[] = [1, 2, 3, 4, 5, 6, 7, 8].map((n) => ({ weight: 2, n })); + + const fn: (item: INumberWithWeight) => Promise = jest.fn(async (item) => { + running++; + await Async.sleep(0); + maxRunning = Math.max(maxRunning, running); + running--; + }); + + await Async.forEachAsync(array, fn, { concurrency: 3 }); + expect(fn).toHaveBeenCalledTimes(8); + expect(maxRunning).toEqual(3); + }); + it.each([ { concurrency: 4, From 34067348c58ff0d2faf431b3ca1c6ae81bdc9b01 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Thu, 2 May 2024 10:27:35 -0400 Subject: [PATCH 18/28] moving the comment to the public function --- libraries/node-core-library/src/Async.ts | 55 +++++++++++++----------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index e21c73b40a..7fd4ffd733 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -36,6 +36,10 @@ export interface IRunWithRetriesOptions { * @public */ export interface IWeightedIterable { + /** + * The weight of the element, used to determine the concurrency units that it will take up. + * Must be a whole number >= 0. + */ weight: number; } @@ -111,31 +115,6 @@ export class Async { return result; } - /** - * Given an input array and a `callback` function, invoke the callback to start a - * promise for each element in the array. - * - * @remarks - * This API is similar to the system `Array#forEachAsync`, except that each item can have - * a weight that determines how many concurrent operations are allowed. `Array#forEachAsync` - * is a special case of this method where weight = 1 for all items. - * - * The maximum number of concurrent operations can still be throttled using - * {@link IAsyncParallelismOptions.concurrency}, however it no longer determines the - * maximum number of operations that can be in progress at once. Instead, it determines the - * number of concurrency units that can be in progress at once. The weight of each operation - * determines how many concurrency units it takes up. For example, if the concurrency is 2 - * and the first operation has a weight of 2, then only one more operation can be in progress. - * - * If `callback` throws a synchronous exception, or if it returns a promise that rejects, - * then the loop stops immediately. Any remaining array items will be skipped, and - * overall operation will reject with the first error that was encountered. - * - * @param iterable - the array of inputs for the callback function - * @param callback - a function that starts an asynchronous promise for an element - * from the array - * @param options - options for customizing the control flow - */ private static async _forEachWeightedAsync( iterable: Iterable | AsyncIterable, callback: (entry: TReturn, arrayIndex: number) => Promise, @@ -234,6 +213,32 @@ export class Async { callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions ): Promise; + + /** + * Given an input array and a `callback` function, invoke the callback to start a + * promise for each element in the array. + * + * @remarks + * This API is similar to the other `Array#forEachAsync`, except that each item can have + * a weight that determines how many concurrent operations are allowed. The unweighted + * `Array#forEachAsync` is a special case of this method where weight = 1 for all items. + * + * The maximum number of concurrent operations can still be throttled using + * {@link IAsyncParallelismOptions.concurrency}, however it no longer determines the + * maximum number of operations that can be in progress at once. Instead, it determines the + * number of concurrency units that can be in progress at once. The weight of each operation + * determines how many concurrency units it takes up. For example, if the concurrency is 2 + * and the first operation has a weight of 2, then only one more operation can be in progress. + * + * If `callback` throws a synchronous exception, or if it returns a promise that rejects, + * then the loop stops immediately. Any remaining array items will be skipped, and + * overall operation will reject with the first error that was encountered. + * + * @param iterable - the array of inputs for the callback function + * @param callback - a function that starts an asynchronous promise for an element + * from the array + * @param options - options for customizing the control flow + */ public static async forEachAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, From 80dae8fc287245eeb4ba5bf7a575203db306c2c7 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Thu, 2 May 2024 10:31:54 -0400 Subject: [PATCH 19/28] fix linting concern --- common/reviews/api/node-core-library.api.md | 2 -- libraries/node-core-library/src/Async.ts | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index 3253ff665e..fb27ebf356 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -26,7 +26,6 @@ export class AlreadyReportedError extends Error { // @public export class Async { static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions): Promise; - // (undocumented) static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: IAsyncParallelismOptions & { weighted: true; }): Promise; @@ -608,7 +607,6 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions { // @public (undocumented) export interface IWeightedIterable { - // (undocumented) weight: number; } diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 7fd4ffd733..e311dba52b 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -38,7 +38,7 @@ export interface IRunWithRetriesOptions { export interface IWeightedIterable { /** * The weight of the element, used to determine the concurrency units that it will take up. - * Must be a whole number >= 0. + * Must be a whole number greater than or equal to 0. */ weight: number; } From ca1ffc69819a1d0ab46603a60263e50492df1174 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Thu, 2 May 2024 13:26:28 -0400 Subject: [PATCH 20/28] handle larger than concurrency weights --- libraries/node-core-library/src/Async.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index e311dba52b..3b3b398a5f 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -149,8 +149,10 @@ export class Async { if (!iteratorIsComplete) { const currentIteratorValue: TEntry = currentIteratorResult.value; - const weight: number = currentIteratorValue.weight ?? 1; + const weight: number = Math.min(currentIteratorValue.weight ?? 1, concurrency); // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. + // Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted + // operations are present. concurrentUnitsInProgress += weight - 1; Promise.resolve(callback(currentIteratorValue.element, arrayIndex++)) .then(async () => { From cc2a498d24742a0707b6ebaeb86e7e9b818a772e Mon Sep 17 00:00:00 2001 From: Aramis Sennyey <159921952+aramissennyeydd@users.noreply.github.com> Date: Fri, 3 May 2024 14:48:25 -0400 Subject: [PATCH 21/28] Apply suggestions from code review Co-authored-by: Ian Clanton-Thuon --- libraries/node-core-library/src/Async.ts | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 3b3b398a5f..a05c7a9304 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -44,10 +44,7 @@ export interface IWeightedIterable { } function getWeight(element: T): number | undefined { - if (typeof element === 'object' && element !== null) { - return 'weight' in element ? (element.weight as number) : undefined; - } - return undefined; + return (element as unknown as IWeightedIterable)?.weight; } function toWeightedIterator( @@ -63,7 +60,7 @@ function toWeightedIterator( next: async () => { const { value, done } = await Promise.resolve(iterator.next()); return { - value: { element: value, weight: useWeights ? getWeight(value) : 1 }, + value: { element: value, weight: useWeights ? (value as unknown as IWeightedIterator).weight : 1 }, done }; } @@ -249,13 +246,9 @@ export class Async { public static async forEachAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, - options?: IAsyncParallelismOptions & { weighted?: true } + options?: IAsyncParallelismOptions ): Promise { - if (options?.weighted) { - return Async._forEachWeightedAsync(toWeightedIterator(iterable, true), callback, options); - } - - return Async._forEachWeightedAsync(toWeightedIterator(iterable), callback, options); + await Async._forEachWeightedAsync(toWeightedIterator(iterable, options?.weighted), callback, options); } /** From a4bbbff1c9ee33ac95ce22a7526b2b9896c6c464 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey <159921952+aramissennyeydd@users.noreply.github.com> Date: Fri, 3 May 2024 15:30:19 -0400 Subject: [PATCH 22/28] Update libraries/rush-lib/src/schemas/rush-project.schema.json Co-authored-by: Ian Clanton-Thuon --- libraries/rush-lib/src/schemas/rush-project.schema.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libraries/rush-lib/src/schemas/rush-project.schema.json b/libraries/rush-lib/src/schemas/rush-project.schema.json index 3be3dc3416..b6f171ddef 100644 --- a/libraries/rush-lib/src/schemas/rush-project.schema.json +++ b/libraries/rush-lib/src/schemas/rush-project.schema.json @@ -76,7 +76,8 @@ "weight": { "description": "The number of concurrency units that this operation should take up. The maximum concurrency units is determined by the -p flag.", - "type": "number" + "type": "integer", + "minimum": 0 } } } From 6163ca98c7864b344e4e81da16c860fba8b3698b Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Fri, 3 May 2024 15:30:41 -0400 Subject: [PATCH 23/28] address code review questions --- common/reviews/api/node-core-library.api.md | 4 +++ libraries/node-core-library/src/Async.ts | 33 ++++++++++++------- .../operations/WeightedOperationPlugin.ts | 7 ++-- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index fb27ebf356..6d70dc8f6e 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -33,6 +33,8 @@ export class Async { static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; static runWithRetriesAsync({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions): Promise; static sleep(ms: number): Promise; + // (undocumented) + static validateWeightedIterable(operation: IWeightedIterable): void; } // @public @@ -228,6 +230,8 @@ export type FolderItem = fs.Dirent; // @public export interface IAsyncParallelismOptions { concurrency?: number; + // (undocumented) + weighted?: boolean; } // @public diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index a05c7a9304..cb19635860 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -15,6 +15,8 @@ export interface IAsyncParallelismOptions { * {@link (Async:class).(forEachAsync:2)} to limit the maximum number of concurrent promises to the specified number. */ concurrency?: number; + + weighted?: boolean; } /** @@ -43,25 +45,22 @@ export interface IWeightedIterable { weight: number; } -function getWeight(element: T): number | undefined { - return (element as unknown as IWeightedIterable)?.weight; -} - function toWeightedIterator( iterable: Iterable | AsyncIterable, - useWeights: boolean = false -): AsyncIterable<{ element: TEntry; weighted?: number }> { - const iterator: Iterator | AsyncIterator = ( + useWeights?: boolean +): AsyncIterable<{ element: TEntry; weight: number }> { + const iterator: Iterator | AsyncIterator = ( (iterable as Iterable)[Symbol.iterator] || (iterable as AsyncIterable)[Symbol.asyncIterator] ).call(iterable); return { [Symbol.asyncIterator]: () => ({ next: async () => { - const { value, done } = await Promise.resolve(iterator.next()); + // The await is necessary here, but TS will complain - it's a false positive. + const { value, done } = await iterator.next(); return { - value: { element: value, weight: useWeights ? (value as unknown as IWeightedIterator).weight : 1 }, - done + value: { element: value, weight: useWeights ? value?.weight : 1 }, + done: !!done }; } }) @@ -112,7 +111,7 @@ export class Async { return result; } - private static async _forEachWeightedAsync( + private static async _forEachWeightedAsync( iterable: Iterable | AsyncIterable, callback: (entry: TReturn, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined @@ -146,7 +145,8 @@ export class Async { if (!iteratorIsComplete) { const currentIteratorValue: TEntry = currentIteratorResult.value; - const weight: number = Math.min(currentIteratorValue.weight ?? 1, concurrency); + Async.validateWeightedIterable(currentIteratorValue); + const weight: number = Math.min(currentIteratorValue.weight, concurrency); // If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1. // Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted // operations are present. @@ -283,6 +283,15 @@ export class Async { } } + public static validateWeightedIterable(operation: IWeightedIterable): void { + if (operation.weight < 0) { + throw new Error('Weight must be a whole number greater than or equal to 0'); + } + if (operation.weight % 1 !== 0) { + throw new Error('Weight must be a whole number greater than or equal to 0'); + } + } + /** * Returns a Signal, a.k.a. a "deferred promise". */ diff --git a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts index d502d674a7..c33e597154 100644 --- a/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/WeightedOperationPlugin.ts @@ -10,6 +10,7 @@ import type { import type { IOperationSettings, RushProjectConfiguration } from '../../api/RushProjectConfiguration'; import type { IOperationExecutionResult } from './IOperationExecutionResult'; import type { OperationExecutionRecord } from './OperationExecutionRecord'; +import { Async } from '@rushstack/node-core-library'; const PLUGIN_NAME: 'WeightedOperationPlugin' = 'WeightedOperationPlugin'; @@ -43,11 +44,7 @@ function weightOperations( operation.weight = operationSettings.weight; } } - if (operation.weight < 0) { - throw new Error(`The weight of the operation '${operation.name}' cannot be negative.`); - } else if (operation.weight % 1 !== 0) { - throw new Error(`The weight of the operation '${operation.name}' cannot be a decimal.`); - } + Async.validateWeightedIterable(operation); } return operations; } From 343ef0ea1f908750dc04f806f4d9e4daacc4570e Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Fri, 3 May 2024 15:49:13 -0400 Subject: [PATCH 24/28] add documentation for weighted --- libraries/node-core-library/src/Async.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index cb19635860..903da67428 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -16,6 +16,10 @@ export interface IAsyncParallelismOptions { */ concurrency?: number; + /** + * Optionally used with the {@link (Async:class).(forEachAsync:2)} to enable weighted operations where an operation can + * take up more or less than one concurrency unit. + */ weighted?: boolean; } From 27ce6a1d52b5431b050e6a0c67cd5d8664ef1862 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Fri, 3 May 2024 15:55:48 -0400 Subject: [PATCH 25/28] fix api report --- common/reviews/api/node-core-library.api.md | 1 - 1 file changed, 1 deletion(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index 6d70dc8f6e..4cedc86076 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -230,7 +230,6 @@ export type FolderItem = fs.Dirent; // @public export interface IAsyncParallelismOptions { concurrency?: number; - // (undocumented) weighted?: boolean; } From 84d8ce8bee311d04811fbfa92c8d8a41117544ad Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Fri, 3 May 2024 18:15:42 -0400 Subject: [PATCH 26/28] add weights for map too --- ...nyeya-weighted-graph_2024-05-03-22-13.json | 10 ++++ common/reviews/api/node-core-library.api.md | 21 ++++++-- libraries/node-core-library/src/Async.ts | 49 ++++++++++++++++--- libraries/node-core-library/src/index.ts | 8 +-- 4 files changed, 69 insertions(+), 19 deletions(-) create mode 100644 common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-03-22-13.json diff --git a/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-03-22-13.json b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-03-22-13.json new file mode 100644 index 0000000000..9dff8a4060 --- /dev/null +++ b/common/changes/@rushstack/node-core-library/sennyeya-weighted-graph_2024-05-03-22-13.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@rushstack/node-core-library", + "comment": "Add a new `weighted: true` option to the `Async.mapAsync` method that allows each element to specify how much of the allowed parallelism the callback uses.", + "type": "patch" + } + ], + "packageName": "@rushstack/node-core-library" +} \ No newline at end of file diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index 4cedc86076..c73301e02c 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -25,16 +25,27 @@ export class AlreadyReportedError extends Error { // @public export class Async { - static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions): Promise; - static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: IAsyncParallelismOptions & { + static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: { + concurrency: number; + weighted?: false; + } | undefined): Promise; + static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: { + concurrency: number; weighted: true; }): Promise; static getSignal(): [Promise, () => void, (err: Error) => void]; - static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: IAsyncParallelismOptions | undefined): Promise; + static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: { + concurrency: number; + weighted?: false; + } | undefined): Promise; + static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: { + concurrency: number; + weighted: true; + } | undefined): Promise; static runWithRetriesAsync({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions): Promise; static sleep(ms: number): Promise; // (undocumented) - static validateWeightedIterable(operation: IWeightedIterable): void; + static validateWeightedIterable(operation: IWeighted): void; } // @public @@ -609,7 +620,7 @@ export interface IWaitForExitWithStringOptions extends IWaitForExitOptions { } // @public (undocumented) -export interface IWeightedIterable { +export interface IWeighted { weight: number; } diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 903da67428..6d1267e51e 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -5,14 +5,16 @@ * Options for controlling the parallelism of asynchronous operations. * * @remarks - * Used with {@link Async.mapAsync} and {@link (Async:class).(forEachAsync:2)}. + * Used with {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and + * {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)}. * * @public */ export interface IAsyncParallelismOptions { /** - * Optionally used with the {@link Async.mapAsync}, {@link (Async:class).(forEachAsync:1)}, and - * {@link (Async:class).(forEachAsync:2)} to limit the maximum number of concurrent promises to the specified number. + * Optionally used with the {@link (Async:class).(mapAsync:1)}, {@link (Async:class).(mapAsync:2)} and + * {@link (Async:class).(forEachAsync:1)}, and {@link (Async:class).(forEachAsync:2)} to limit the maximum + * number of concurrent promises to the specified number. */ concurrency?: number; @@ -41,7 +43,7 @@ export interface IRunWithRetriesOptions { * * @public */ -export interface IWeightedIterable { +export interface IWeighted { /** * The weight of the element, used to determine the concurrency units that it will take up. * Must be a whole number greater than or equal to 0. @@ -97,6 +99,38 @@ export class Async { * @returns an array containing the result for each callback, in the same order * as the original input `array` */ + public static async mapAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options?: (IAsyncParallelismOptions & { weighted?: false }) | undefined + ): Promise; + + /** + * Given an input array and a `callback` function, invoke the callback to start a + * promise for each element in the array. Returns an array containing the results. + * + * @remarks + * This API is similar to the system `Array#map`, except that the loop is asynchronous, + * and the maximum number of concurrent units can be throttled + * using {@link IAsyncParallelismOptions.concurrency}. Using the {@link IAsyncParallelismOptions.weighted} + * option, the weight of each operation can be specified, which determines how many concurrent units it takes up. + * + * If `callback` throws a synchronous exception, or if it returns a promise that rejects, + * then the loop stops immediately. Any remaining array items will be skipped, and + * overall operation will reject with the first error that was encountered. + * + * @param iterable - the array of inputs for the callback function + * @param callback - a function that starts an asynchronous promise for an element + * from the array + * @param options - options for customizing the control flow + * @returns an array containing the result for each callback, in the same order + * as the original input `array` + */ + public static async mapAsync( + iterable: Iterable | AsyncIterable, + callback: (entry: TEntry, arrayIndex: number) => Promise, + options: IAsyncParallelismOptions & { weighted: true } + ): Promise; public static async mapAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, @@ -104,6 +138,7 @@ export class Async { ): Promise { const result: TRetVal[] = []; + // @ts-expect-error https://github.com/microsoft/TypeScript/issues/22609, it succeeds against the implementation but fails against the overloads await Async.forEachAsync( iterable, async (item: TEntry, arrayIndex: number): Promise => { @@ -214,7 +249,7 @@ export class Async { public static async forEachAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, - options?: IAsyncParallelismOptions + options?: (IAsyncParallelismOptions & { weighted?: false }) | undefined ): Promise; /** @@ -242,7 +277,7 @@ export class Async { * from the array * @param options - options for customizing the control flow */ - public static async forEachAsync( + public static async forEachAsync( iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: IAsyncParallelismOptions & { weighted: true } @@ -287,7 +322,7 @@ export class Async { } } - public static validateWeightedIterable(operation: IWeightedIterable): void { + public static validateWeightedIterable(operation: IWeighted): void { if (operation.weight < 0) { throw new Error('Weight must be a whole number greater than or equal to 0'); } diff --git a/libraries/node-core-library/src/index.ts b/libraries/node-core-library/src/index.ts index ad57ffe89f..e7c7a31e7a 100644 --- a/libraries/node-core-library/src/index.ts +++ b/libraries/node-core-library/src/index.ts @@ -8,13 +8,7 @@ */ export { AlreadyReportedError } from './AlreadyReportedError'; -export { - Async, - AsyncQueue, - IAsyncParallelismOptions, - IRunWithRetriesOptions, - IWeightedIterable -} from './Async'; +export { Async, AsyncQueue, IAsyncParallelismOptions, IRunWithRetriesOptions, IWeighted } from './Async'; export { Brand } from './PrimitiveTypes'; export { FileConstants, FolderConstants } from './Constants'; export { Enum } from './Enum'; From 0598f0d35d1fb573f5237a5111cfe5288feeb966 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey Date: Fri, 3 May 2024 18:18:52 -0400 Subject: [PATCH 27/28] fix api report --- common/reviews/api/node-core-library.api.md | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index c73301e02c..3fe1f4ec65 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -25,23 +25,19 @@ export class AlreadyReportedError extends Error { // @public export class Async { - static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: { - concurrency: number; + static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: (IAsyncParallelismOptions & { weighted?: false; - } | undefined): Promise; - static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: { - concurrency: number; + }) | undefined): Promise; + static forEachAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: IAsyncParallelismOptions & { weighted: true; }): Promise; static getSignal(): [Promise, () => void, (err: Error) => void]; - static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: { - concurrency: number; + static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options?: (IAsyncParallelismOptions & { weighted?: false; - } | undefined): Promise; - static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: { - concurrency: number; + }) | undefined): Promise; + static mapAsync(iterable: Iterable | AsyncIterable, callback: (entry: TEntry, arrayIndex: number) => Promise, options: IAsyncParallelismOptions & { weighted: true; - } | undefined): Promise; + }): Promise; static runWithRetriesAsync({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions): Promise; static sleep(ms: number): Promise; // (undocumented) From a52606799c7901271e047855e1b7d202155c4706 Mon Sep 17 00:00:00 2001 From: Aramis Sennyey <159921952+aramissennyeydd@users.noreply.github.com> Date: Sat, 4 May 2024 10:34:28 -0400 Subject: [PATCH 28/28] Apply suggestions from code review Co-authored-by: Ian Clanton-Thuon --- common/reviews/api/node-core-library.api.md | 1 - libraries/node-core-library/src/Async.ts | 6 +++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index 3fe1f4ec65..f850c8bef6 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -40,7 +40,6 @@ export class Async { }): Promise; static runWithRetriesAsync({ action, maxRetries, retryDelayMs }: IRunWithRetriesOptions): Promise; static sleep(ms: number): Promise; - // (undocumented) static validateWeightedIterable(operation: IWeighted): void; } diff --git a/libraries/node-core-library/src/Async.ts b/libraries/node-core-library/src/Async.ts index 6d1267e51e..400cb8e00d 100644 --- a/libraries/node-core-library/src/Async.ts +++ b/libraries/node-core-library/src/Async.ts @@ -39,7 +39,7 @@ export interface IRunWithRetriesOptions { /** * @remarks - * Used with {@link (Async:class).(forEachAsync:2)}. + * Used with {@link (Async:class).(forEachAsync:2)} and {@link (Async:class).(mapAsync:2)}. * * @public */ @@ -322,6 +322,10 @@ export class Async { } } + /** + * Ensures that the argument is a valid {@link IWeighted}, with a `weight` argument that + * is a positive integer or 0. + */ public static validateWeightedIterable(operation: IWeighted): void { if (operation.weight < 0) { throw new Error('Weight must be a whole number greater than or equal to 0');