Skip to content

Commit

Permalink
fix(react): host server exits when one of the remotes fail (#10185)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaysoo committed May 6, 2022
1 parent 5dc5429 commit c4e5b7c
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 59 deletions.
@@ -1,8 +1,12 @@
import { ExecutorContext, runExecutor } from '@nrwl/devkit';
import { ExecutorContext, logger, runExecutor } from '@nrwl/devkit';
import devServerExecutor, {
WebDevServerOptions,
} from '@nrwl/web/src/executors/dev-server/dev-server.impl';
import { join } from 'path';
import {
combineAsyncIterators,
tapAsyncIterator,
} from '../../utils/async-iterator';

type ModuleFederationDevServerOptions = WebDevServerOptions & {
devRemotes?: string | string[];
Expand Down Expand Up @@ -57,61 +61,13 @@ export default async function* moduleFederationDevServer(
);
}

return yield* iter;
}

// TODO(jack): Extract this helper
function getNextAsyncIteratorFactory(options) {
return async (asyncIterator, index) => {
try {
const iterator = await asyncIterator.next();

return { index, iterator };
} catch (err) {
if (options.errorCallback) {
options.errorCallback(err, index);
}
if (options.throwError !== false) {
return Promise.reject(err);
}

return { index, iterator: { done: true } };
}
};
}

async function* combineAsyncIterators(
...iterators: { 0: AsyncIterator<any> } & AsyncIterator<any>[]
) {
let [options] = iterators;
if (typeof options.next === 'function') {
options = Object.create(null);
} else {
iterators.shift();
}

const getNextAsyncIteratorValue = getNextAsyncIteratorFactory(options);

try {
const asyncIteratorsValues = new Map(
iterators.map((it, idx) => [idx, getNextAsyncIteratorValue(it, idx)])
);

do {
const { iterator, index } = await Promise.race(
asyncIteratorsValues.values()
let numAwaiting = knownRemotes.length + 1; // remotes + host
return yield* tapAsyncIterator(iter, (x) => {
numAwaiting--;
if (numAwaiting === 0) {
logger.info(
`Host is ready: ${options.host ?? 'localhost'}:${options.port ?? 4200}`
);
if (iterator.done) {
asyncIteratorsValues.delete(index);
} else {
yield iterator.value;
asyncIteratorsValues.set(
index,
getNextAsyncIteratorValue(iterators[index], index)
);
}
} while (asyncIteratorsValues.size > 0);
} finally {
await Promise.allSettled(iterators.map((it) => it.return()));
}
}
});
}
100 changes: 100 additions & 0 deletions packages/react/src/utils/async-iterator.spec.ts
@@ -0,0 +1,100 @@
import {
mapAsyncIterator,
combineAsyncIterators,
tapAsyncIterator,
} from './async-iterator';

function delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

describe('combineAsyncIterators', () => {
it('should merge iterators', async () => {
async function* a() {
await delay(20);
yield 'a';
}

async function* b() {
await delay(0);
yield 'b';
}

const c = combineAsyncIterators(a(), b());
const results = [];

for await (const x of c) {
results.push(x);
}

expect(results).toEqual(['b', 'a']);
});

it('should throw when one iterator throws', async () => {
async function* a() {
await delay(20);
yield 'a';
}

async function* b() {
throw new Error('threw in b');
}

const c = combineAsyncIterators(a(), b());

async function* d() {
yield* c;
}

try {
for await (const x of d()) {
}
throw new Error('should not reach here');
} catch (e) {
expect(e.message).toMatch(/threw in b/);
}
});
});

describe('mapAsyncIterator', () => {
it('should map over values', async () => {
async function* f() {
yield 1;
yield 2;
yield 3;
}

const c = mapAsyncIterator(f(), (x) => x * 2);
const results = [];

for await (const x of c) {
results.push(x);
}

expect(results).toEqual([2, 4, 6]);
});
});

describe('tapAsyncIterator', () => {
it('should tap values', async () => {
async function* f() {
yield 1;
yield 2;
yield 3;
}

const tapped = [];
const results = [];

const c = tapAsyncIterator(f(), (x) => {
tapped.push(`tap: ${x}`);
});

for await (const x of c) {
results.push(x);
}

expect(tapped).toEqual(['tap: 1', 'tap: 2', 'tap: 3']);
expect(results).toEqual([1, 2, 3]);
});
});
78 changes: 78 additions & 0 deletions packages/react/src/utils/async-iterator.ts
@@ -0,0 +1,78 @@
export async function* combineAsyncIterators(
...iterators: { 0: AsyncIterableIterator<any> } & AsyncIterableIterator<any>[]
) {
let [options] = iterators;
if (typeof options.next === 'function') {
options = Object.create(null);
} else {
iterators.shift();
}

const getNextAsyncIteratorValue = getNextAsyncIteratorFactory(options);

try {
const asyncIteratorsValues = new Map(
iterators.map((it, idx) => [idx, getNextAsyncIteratorValue(it, idx)])
);

do {
const { iterator, index } = await Promise.race(
asyncIteratorsValues.values()
);
if (iterator.done) {
asyncIteratorsValues.delete(index);
} else {
yield iterator.value;
asyncIteratorsValues.set(
index,
getNextAsyncIteratorValue(iterators[index], index)
);
}
} while (asyncIteratorsValues.size > 0);
} finally {
await Promise.allSettled(iterators.map((it) => it.return()));
}
}

function getNextAsyncIteratorFactory(options) {
return async (asyncIterator, index) => {
try {
const iterator = await asyncIterator.next();

return { index, iterator };
} catch (err) {
if (options.errorCallback) {
options.errorCallback(err, index);
}
return Promise.reject(err);
}
};
}

export async function* mapAsyncIterator<T = any, I = any, O = any>(
data: AsyncIterableIterator<T>,
transform: (input: I, index?: number, data?: AsyncIterableIterator<T>) => O
) {
async function* f() {
const generator = data[Symbol.asyncIterator] || data[Symbol.iterator];
const iterator = generator.call(data);
let index = 0;
let item = await iterator.next();
while (!item.done) {
yield await transform(await item.value, index, data);
index++;
item = await iterator.next();
}
}
return yield* f();
}

export async function* tapAsyncIterator<T = any, I = any, O = any>(
data: AsyncIterableIterator<T>,
fn: (input: I) => void
) {
return yield* mapAsyncIterator(data, (x) => {
fn(x);
return x;
});
}
10 changes: 8 additions & 2 deletions packages/web/src/executors/file-server/file-server.impl.ts
@@ -1,4 +1,5 @@
import { execFileSync, fork } from 'child_process';
import * as chalk from 'chalk';
import {
ExecutorContext,
joinPathFragments,
Expand Down Expand Up @@ -131,8 +132,13 @@ export default async function* fileServerExecutor(
execFileSync(pmCmd, args, {
stdio: [0, 1, 2],
});
} catch {}
running = false;
} catch {
throw new Error(
`Build target failed: ${chalk.bold(options.buildTarget)}`
);
} finally {
running = false;
}
}
};

Expand Down

0 comments on commit c4e5b7c

Please sign in to comment.