diff --git a/packages/react/src/executors/module-federation-dev-server/module-federation-dev-server.impl.ts b/packages/react/src/executors/module-federation-dev-server/module-federation-dev-server.impl.ts index d70b46b75f375..663c5ddbdd7ac 100644 --- a/packages/react/src/executors/module-federation-dev-server/module-federation-dev-server.impl.ts +++ b/packages/react/src/executors/module-federation-dev-server/module-federation-dev-server.impl.ts @@ -1,8 +1,12 @@ -import { ExecutorContext, runExecutor } from '@nrwl/devkit'; +import { ExecutorContext, logger, runExecutor } from '@nrwl/devkit'; import devServerExecutor, { WebDevServerOptions, } from '@nrwl/web/src/executors/dev-server/dev-server.impl'; import { join } from 'path'; +import { + combineAsyncIterators, + tapAsyncIterator, +} from '../../utils/async-iterator'; type ModuleFederationDevServerOptions = WebDevServerOptions & { devRemotes?: string | string[]; @@ -57,61 +61,13 @@ export default async function* moduleFederationDevServer( ); } - return yield* iter; -} - -// TODO(jack): Extract this helper -function getNextAsyncIteratorFactory(options) { - return async (asyncIterator, index) => { - try { - const iterator = await asyncIterator.next(); - - return { index, iterator }; - } catch (err) { - if (options.errorCallback) { - options.errorCallback(err, index); - } - if (options.throwError !== false) { - return Promise.reject(err); - } - - return { index, iterator: { done: true } }; - } - }; -} - -async function* combineAsyncIterators( - ...iterators: { 0: AsyncIterator } & AsyncIterator[] -) { - let [options] = iterators; - if (typeof options.next === 'function') { - options = Object.create(null); - } else { - iterators.shift(); - } - - const getNextAsyncIteratorValue = getNextAsyncIteratorFactory(options); - - try { - const asyncIteratorsValues = new Map( - iterators.map((it, idx) => [idx, getNextAsyncIteratorValue(it, idx)]) - ); - - do { - const { iterator, index } = await Promise.race( - asyncIteratorsValues.values() + let numAwaiting = knownRemotes.length + 1; // remotes + host + return yield* tapAsyncIterator(iter, (x) => { + numAwaiting--; + if (numAwaiting === 0) { + logger.info( + `Host is ready: ${options.host ?? 'localhost'}:${options.port ?? 4200}` ); - if (iterator.done) { - asyncIteratorsValues.delete(index); - } else { - yield iterator.value; - asyncIteratorsValues.set( - index, - getNextAsyncIteratorValue(iterators[index], index) - ); - } - } while (asyncIteratorsValues.size > 0); - } finally { - await Promise.allSettled(iterators.map((it) => it.return())); - } + } + }); } diff --git a/packages/react/src/utils/async-iterator.spec.ts b/packages/react/src/utils/async-iterator.spec.ts new file mode 100644 index 0000000000000..7ba53d2d7ffd4 --- /dev/null +++ b/packages/react/src/utils/async-iterator.spec.ts @@ -0,0 +1,100 @@ +import { + mapAsyncIterator, + combineAsyncIterators, + tapAsyncIterator, +} from './async-iterator'; + +function delay(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +describe('combineAsyncIterators', () => { + it('should merge iterators', async () => { + async function* a() { + await delay(20); + yield 'a'; + } + + async function* b() { + await delay(0); + yield 'b'; + } + + const c = combineAsyncIterators(a(), b()); + const results = []; + + for await (const x of c) { + results.push(x); + } + + expect(results).toEqual(['b', 'a']); + }); + + it('should throw when one iterator throws', async () => { + async function* a() { + await delay(20); + yield 'a'; + } + + async function* b() { + throw new Error('threw in b'); + } + + const c = combineAsyncIterators(a(), b()); + + async function* d() { + yield* c; + } + + try { + for await (const x of d()) { + } + throw new Error('should not reach here'); + } catch (e) { + expect(e.message).toMatch(/threw in b/); + } + }); +}); + +describe('mapAsyncIterator', () => { + it('should map over values', async () => { + async function* f() { + yield 1; + yield 2; + yield 3; + } + + const c = mapAsyncIterator(f(), (x) => x * 2); + const results = []; + + for await (const x of c) { + results.push(x); + } + + expect(results).toEqual([2, 4, 6]); + }); +}); + +describe('tapAsyncIterator', () => { + it('should tap values', async () => { + async function* f() { + yield 1; + yield 2; + yield 3; + } + + const tapped = []; + const results = []; + + const c = tapAsyncIterator(f(), (x) => { + tapped.push(`tap: ${x}`); + }); + + for await (const x of c) { + results.push(x); + } + + expect(tapped).toEqual(['tap: 1', 'tap: 2', 'tap: 3']); + expect(results).toEqual([1, 2, 3]); + }); +}); diff --git a/packages/react/src/utils/async-iterator.ts b/packages/react/src/utils/async-iterator.ts new file mode 100644 index 0000000000000..366ebbb21617b --- /dev/null +++ b/packages/react/src/utils/async-iterator.ts @@ -0,0 +1,78 @@ +export async function* combineAsyncIterators( + ...iterators: { 0: AsyncIterableIterator } & AsyncIterableIterator[] +) { + let [options] = iterators; + if (typeof options.next === 'function') { + options = Object.create(null); + } else { + iterators.shift(); + } + + const getNextAsyncIteratorValue = getNextAsyncIteratorFactory(options); + + try { + const asyncIteratorsValues = new Map( + iterators.map((it, idx) => [idx, getNextAsyncIteratorValue(it, idx)]) + ); + + do { + const { iterator, index } = await Promise.race( + asyncIteratorsValues.values() + ); + if (iterator.done) { + asyncIteratorsValues.delete(index); + } else { + yield iterator.value; + asyncIteratorsValues.set( + index, + getNextAsyncIteratorValue(iterators[index], index) + ); + } + } while (asyncIteratorsValues.size > 0); + } finally { + await Promise.allSettled(iterators.map((it) => it.return())); + } +} + +function getNextAsyncIteratorFactory(options) { + return async (asyncIterator, index) => { + try { + const iterator = await asyncIterator.next(); + + return { index, iterator }; + } catch (err) { + if (options.errorCallback) { + options.errorCallback(err, index); + } + return Promise.reject(err); + } + }; +} + +export async function* mapAsyncIterator( + data: AsyncIterableIterator, + transform: (input: I, index?: number, data?: AsyncIterableIterator) => O +) { + async function* f() { + const generator = data[Symbol.asyncIterator] || data[Symbol.iterator]; + const iterator = generator.call(data); + let index = 0; + let item = await iterator.next(); + while (!item.done) { + yield await transform(await item.value, index, data); + index++; + item = await iterator.next(); + } + } + return yield* f(); +} + +export async function* tapAsyncIterator( + data: AsyncIterableIterator, + fn: (input: I) => void +) { + return yield* mapAsyncIterator(data, (x) => { + fn(x); + return x; + }); +} diff --git a/packages/web/src/executors/file-server/file-server.impl.ts b/packages/web/src/executors/file-server/file-server.impl.ts index 2d452d8f7e491..8c1dc10a39550 100644 --- a/packages/web/src/executors/file-server/file-server.impl.ts +++ b/packages/web/src/executors/file-server/file-server.impl.ts @@ -1,4 +1,5 @@ import { execFileSync, fork } from 'child_process'; +import * as chalk from 'chalk'; import { ExecutorContext, joinPathFragments, @@ -131,8 +132,13 @@ export default async function* fileServerExecutor( execFileSync(pmCmd, args, { stdio: [0, 1, 2], }); - } catch {} - running = false; + } catch { + throw new Error( + `Build target failed: ${chalk.bold(options.buildTarget)}` + ); + } finally { + running = false; + } } };