Skip to content

Commit

Permalink
Revert "fix(core): reject all promises in pool during shutdown (nrwl#…
Browse files Browse the repository at this point in the history
…22188)"

This reverts commit 84d96cc.
  • Loading branch information
FrozenPandaz committed Mar 8, 2024
1 parent e6dbe80 commit 1fb9bc8
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions packages/nx/src/project-graph/plugins/plugin-pool.ts
Expand Up @@ -13,15 +13,16 @@ const pool: Set<ChildProcess> = new Set();

const pidMap = new Map<number, { name: string; pending: Set<string> }>();

interface PromiseBankEntry {
promise: Promise<unknown>;
resolver: (result: any) => void;
rejector: (err: any) => void;
}

// transaction id (tx) -> Promise, Resolver, Rejecter
// Makes sure that we can resolve the correct promise when the worker sends back the result
const promiseBank = new Map<string, PromiseBankEntry>();
const promiseBank = new Map<
string,
{
promise: Promise<unknown>;
resolver: (result: any) => void;
rejecter: (err: any) => void;
}
>();

export function loadRemoteNxPlugin(plugin: PluginConfiguration, root: string) {
// this should only really be true when running unit tests within
Expand Down Expand Up @@ -70,14 +71,17 @@ export async function shutdownPluginWorkers() {

const pending = getPendingPromises(pool, pidMap);

for (const pendingPromise of pending) {
pendingPromise.rejector(new Error('Shutting down'));
if (pending.length > 0) {
// logger.verbose(
// `[plugin-pool] waiting for ${pending.length} pending operations to complete`
// );
await Promise.all(pending);
}

// logger.verbose(`[plugin-pool] all pending operations completed`);

for (const childProcess of pool) {
childProcess.kill('SIGINT');
for (const p of pool) {
p.kill('SIGINT');
}

// logger.verbose(`[plugin-pool] all workers killed`);
Expand Down Expand Up @@ -163,31 +167,31 @@ function createWorkerHandler(
}
},
createDependenciesResult: ({ tx, ...result }) => {
const { resolver, rejector } = promiseBank.get(tx);
const { resolver, rejecter } = promiseBank.get(tx);
if (result.success) {
resolver(result.dependencies);
} else if (result.success === false) {
rejector(result.error);
rejecter(result.error);
}
pidMap.get(worker.pid)?.pending.delete(tx);
promiseBank.delete(tx);
},
createNodesResult: ({ tx, ...result }) => {
const { resolver, rejector } = promiseBank.get(tx);
const { resolver, rejecter } = promiseBank.get(tx);
if (result.success) {
resolver(result.result);
} else if (result.success === false) {
rejector(result.error);
rejecter(result.error);
}
pidMap.get(worker.pid)?.pending.delete(tx);
promiseBank.delete(tx);
},
processProjectGraphResult: ({ tx, ...result }) => {
const { resolver, rejector } = promiseBank.get(tx);
const { resolver, rejecter } = promiseBank.get(tx);
if (result.success) {
resolver(result.graph);
} else if (result.success === false) {
rejector(result.error);
rejecter(result.error);
}
pidMap.get(worker.pid)?.pending.delete(tx);
promiseBank.delete(tx);
Expand All @@ -200,8 +204,8 @@ function createWorkerExitHandler(worker: ChildProcess) {
return () => {
if (!pluginWorkersShutdown) {
pidMap.get(worker.pid)?.pending.forEach((tx) => {
const { rejector } = promiseBank.get(tx);
rejector(
const { rejecter } = promiseBank.get(tx);
rejecter(
new Error(
`Plugin worker ${
pidMap.get(worker.pid).name ?? worker.pid
Expand All @@ -224,11 +228,11 @@ function getPendingPromises(
pool: Set<ChildProcess>,
pidMap: Map<number, { name: string; pending: Set<string> }>
) {
const pendingTxs: Array<PromiseBankEntry> = [];
const pendingTxs: Array<Promise<unknown>> = [];
for (const p of pool) {
const { pending } = pidMap.get(p.pid) ?? { pending: new Set() };
for (const tx of pending) {
pendingTxs.push(promiseBank.get(tx));
pendingTxs.push(promiseBank.get(tx)?.promise);
}
}
return pendingTxs;
Expand All @@ -239,11 +243,11 @@ function registerPendingPromise(
pending: Set<string>,
callback: () => void
): Promise<any> {
let resolver, rejector;
let resolver, rejecter;

const promise = new Promise((res, rej) => {
resolver = res;
rejector = rej;
rejecter = rej;

callback();
}).then((val) => {
Expand All @@ -257,7 +261,7 @@ function registerPendingPromise(
promiseBank.set(tx, {
promise,
resolver,
rejector,
rejecter,
});
return promise;
}

0 comments on commit 1fb9bc8

Please sign in to comment.