Skip to content

Commit

Permalink
Support pMapSkip and fix backpressure bug
Browse files Browse the repository at this point in the history
Signed-off-by: Richie Bendall <richiebendall@gmail.com>
  • Loading branch information
Richienb committed Nov 26, 2023
1 parent 60419f8 commit 7e68c18
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 29 deletions.
63 changes: 40 additions & 23 deletions index.js
Expand Up @@ -194,43 +194,53 @@ export function pMapIterable(
let runningMappersCount = 0;
let isDone = false;

async function spawn() {
const {done, value} = await iterator.next();
function trySpawn() {
if (!isDone && runningMappersCount < concurrency && promises.length < backpressure) {
const promise = (async () => {
const {done, value} = await iterator.next();

if (done) {
return {done: true};
}
if (done) {
return {done: true};
}

runningMappersCount++;
runningMappersCount++;

// Spawn if still below concurrency and backpressure limit
trySpawn();
// Spawn if still below concurrency and backpressure limit
trySpawn();

try {
const returnValue = await mapper(value);
try {
const returnValue = await mapper(value);

runningMappersCount--;
runningMappersCount--;

// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn();
if (returnValue === pMapSkip) {
const index = promises.indexOf(promise);

return {done: false, value: returnValue};
} catch (error) {
isDone = true;
return {error};
}
}
if (index > 0) {
promises.splice(index, 1);
}
}

function trySpawn() {
if (!isDone && runningMappersCount < concurrency && promises.length < backpressure) {
promises.push(spawn());
// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn();

return {done: false, value: returnValue};
} catch (error) {
isDone = true;
return {error};
}
})();

promises.push(promise);
}
}

trySpawn();

while (promises.length > 0) {
const {error, done, value} = await promises.shift(); // eslint-disable-line no-await-in-loop
const {error, done, value} = await promises[0]; // eslint-disable-line no-await-in-loop

promises.shift();

if (error) {
throw error;
Expand All @@ -240,6 +250,13 @@ export function pMapIterable(
return;
}

// Spawn if just dropped below backpressure limit and below the concurrency limit
trySpawn();

if (value === pMapSkip) {
continue;
}

yield value;
}
},
Expand Down
36 changes: 30 additions & 6 deletions test.js
Expand Up @@ -565,33 +565,57 @@ test('pMapIterable - stop on error', async t => {

test('pMapIterable - concurrency: 1', async t => {
const end = timeSpan();
t.deepEqual(await collectAsyncIterable(pMapIterable(sharedInput, mapper, {concurrency: 1})), [10, 20, 30]);
t.deepEqual(await collectAsyncIterable(pMapIterable(sharedInput, mapper, {concurrency: 1, backpressure: Number.POSITIVE_INFINITY})), [10, 20, 30]);

// It could've only taken this much time if each were run in series
assertInRange(t, end(), {start: 590, end: 760});
});

test('pMapIterable - concurrency: 2', async t => {
const times = new Map();
const end = timeSpan();

t.deepEqual(await collectAsyncIterable(pMapIterable(longerSharedInput, mapper, {concurrency: 2})), [10, 20, 30, 40, 50]);
t.deepEqual(await collectAsyncIterable(pMapIterable(longerSharedInput, value => {
times.set(value[0], end());
return mapper(value);
}, {concurrency: 2, backpressure: Number.POSITIVE_INFINITY})), [10, 20, 30, 40, 50]);

assertInRange(t, end(), {start: 325, end: 375});
assertInRange(t, times.get(10), {start: 0, end: 50});
assertInRange(t, times.get(20), {start: 0, end: 50});
assertInRange(t, times.get(30), {start: 200, end: 250});
assertInRange(t, times.get(40), {start: 300, end: 350});
assertInRange(t, times.get(50), {start: 300, end: 350});
});

test('pMapIterable - backpressure', async t => {
let currentValue;

// Concurrency option is forced by an early check
const {value} = await pMapIterable(longerSharedInput, async value => {
const asyncIterator = pMapIterable(longerSharedInput, async value => {
currentValue = await mapper(value);
return currentValue;
}, {backpressure: 2, concurrency: 2})[Symbol.asyncIterator]().next();
}, {backpressure: 2, concurrency: 2})[Symbol.asyncIterator]();

t.is(value, 10);
const {value: value1} = await asyncIterator.next();
t.is(value1, 10);

// If backpressure is not respected, than all items will be evaluated in this time
await delay(600);

t.is(currentValue, 30);

const {value: value2} = await asyncIterator.next();
t.is(value2, 20);

await delay(100);

t.is(currentValue, 40);
});

test('pMapIterable - pMapSkip', async t => {
t.deepEqual(await collectAsyncIterable(pMapIterable([
1,
pMapSkip,
2,
], async value => value)), [1, 2]);
});

0 comments on commit 7e68c18

Please sign in to comment.