Skip to content

Commit

Permalink
Update index.js
Browse files Browse the repository at this point in the history
  • Loading branch information
Richienb committed Dec 5, 2023
1 parent 7e68c18 commit 408b371
Showing 1 changed file with 29 additions and 27 deletions.
56 changes: 29 additions & 27 deletions index.js
Expand Up @@ -195,44 +195,46 @@ export function pMapIterable(
let isDone = false;

function trySpawn() {
if (!isDone && runningMappersCount < concurrency && promises.length < backpressure) {
const promise = (async () => {
const {done, value} = await iterator.next();
if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) {
return;
}

if (done) {
return {done: true};
}
const promise = (async () => {
const {done, value} = await iterator.next();

runningMappersCount++;
if (done) {
return {done: true};
}

// Spawn if still below concurrency and backpressure limit
trySpawn();
runningMappersCount++;

try {
const returnValue = await mapper(value);
// Spawn if still below concurrency and backpressure limit
trySpawn();

try {
const returnValue = await mapper(value);

runningMappersCount--;
runningMappersCount--;

if (returnValue === pMapSkip) {
const index = promises.indexOf(promise);
if (returnValue === pMapSkip) {
const index = promises.indexOf(promise);

if (index > 0) {
promises.splice(index, 1);
}
if (index > 0) {
promises.splice(index, 1);
}
}

// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn();
// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn();

return {done: false, value: returnValue};
} catch (error) {
isDone = true;
return {error};
}
})();
return {done: false, value: returnValue};
} catch (error) {
isDone = true;
return {error};
}
})();

promises.push(promise);
}
promises.push(promise);
}

trySpawn();
Expand Down

0 comments on commit 408b371

Please sign in to comment.