Skip to content

Commit

Permalink
Rewrite to use async generator and improve tests
Browse files Browse the repository at this point in the history
Signed-off-by: Richie Bendall <richiebendall@gmail.com>
  • Loading branch information
Richienb committed Nov 24, 2023
1 parent 518e47e commit 63d99e0
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 107 deletions.
10 changes: 10 additions & 0 deletions assert-in-range.js
@@ -0,0 +1,10 @@
import chalk from 'chalk';
import inRange from 'in-range';

export default function assertInRange(t, value, {start = 0, end}) {
if (inRange(value, {start, end})) {
t.pass();
} else {
t.fail(`${start} ${start <= value ? '≤' : chalk.red('≰')} ${chalk.yellow(value)} ${value <= end ? '≤' : chalk.red('≰')} ${end}`);
}
}
133 changes: 36 additions & 97 deletions index.js
Expand Up @@ -217,122 +217,61 @@ export function pMapIterable(
}

return {
[Symbol.asyncIterator]() {
let isDone = false;
const pendingQueue = [];
const waitingQueue = [];
const valueQueue = [];
const valuePromises = [];

async * [Symbol.asyncIterator]() {
const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();

function tryToFlushWaitingQueue() {
while (waitingQueue[0]) {
const result = waitingQueue.shift();
const promises = [];
let runningMappersCount = 0;
let isDone = false;

if (valuePromises.length > 0) {
const {resolve, reject} = valuePromises.shift();
async function spawn() {
const {done, value} = await iterator.next();

if (result.done) {
resolve({done: true});
} else if (result.error) {
reject(result.error);
} else {
resolve({done: false, value: result.value});
}
} else {
valueQueue.push(result);
}
if (done) {
return {done: true};
}
}

async function tryToContinue() {
while (pendingQueue.length < concurrency && valueQueue.length + waitingQueue.length + pendingQueue.length < backpressure && !isDone) {
try {
const {done, value} = await iterator.next(); // eslint-disable-line no-await-in-loop
runningMappersCount++;

if (done) {
isDone = true;
waitingQueue[pendingQueue.length] = {done: true};
tryToFlushWaitingQueue();
// Spawn if still below concurrency and backpressure limit
trySpawn();

return;
}

const promise = (async () => {
try {
const result = await mapper(value);

const index = pendingQueue.indexOf(promise);

pendingQueue.splice(index, 1);
tryToContinue();

if (result === pMapSkip) {
waitingQueue.splice(index, 1);
} else {
waitingQueue[index] = {value: result};
}
} catch (error) {
const index = pendingQueue.indexOf(promise);

pendingQueue.splice(index);

waitingQueue[index] = {error};

isDone = true;
waitingQueue[index + 1] = {done: true};
} finally {
tryToFlushWaitingQueue();
}
})();
try {
const returnValue = await mapper(value);

pendingQueue.push(promise);
} catch (error) {
waitingQueue[pendingQueue.length] = {error};
runningMappersCount--;

isDone = true;
waitingQueue[pendingQueue.length + 1] = {done: true};
// Spawn if still below backpressure limit and just dropped below concurrency limit
trySpawn();

tryToFlushWaitingQueue();
}
return {done: false, value: returnValue};
} catch (error) {
isDone = true;
return {error};
}
}

tryToContinue();

return {
async next() {
if (isDone && pendingQueue.length === 0 && waitingQueue.length === 0 && valueQueue.length === 0) {
return {done: true};
}

if (valueQueue.length > 0) {
const {done, value, error} = valueQueue.shift();
function trySpawn() {
if (!isDone && runningMappersCount < concurrency && promises.length < backpressure) {
promises.push(spawn());
}
}

tryToContinue();
trySpawn();

if (done) {
return {done: true};
}
while (promises.length > 0) {
const {error, done, value} = await promises.shift(); // eslint-disable-line no-await-in-loop

if (error) {
throw error;
}

return {done: false, value};
}
if (error) {
throw error;
}

return new Promise((resolve, reject) => {
valuePromises.push({resolve, reject});
});
},
async return() {
isDone = true;
if (done) {
return;
}

return {done: true};
},
};
yield value;
}
},
};
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -42,6 +42,7 @@
],
"devDependencies": {
"ava": "^5.2.0",
"chalk": "^5.3.0",
"delay": "^5.0.0",
"in-range": "^3.0.0",
"random-int": "^3.0.0",
Expand Down
4 changes: 2 additions & 2 deletions test-multiple-pmapskips-performance.js
@@ -1,6 +1,6 @@
import test from 'ava';
import inRange from 'in-range';
import timeSpan from 'time-span';
import assertInRange from './assert-in-range.js';
import pMap, {pMapSkip} from './index.js';

function generateSkipPerformanceData(length) {
Expand Down Expand Up @@ -32,6 +32,6 @@ test('multiple pMapSkips - algorithmic complexity', async t => {
// shorter test. This is not perfect... there is some fluctuation.
// The idea here is to catch a regression that makes `pMapSkip` handling O(n^2)
// on the number of `pMapSkip` items in the input.
t.true(inRange(longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration}));
assertInRange(t, longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration});
}
});
49 changes: 41 additions & 8 deletions test.js
@@ -1,8 +1,8 @@
import test from 'ava';
import delay from 'delay';
import inRange from 'in-range';
import timeSpan from 'time-span';
import randomInt from 'random-int';
import assertInRange from './assert-in-range.js';
import pMap, {pMapIterable, pMapSkip} from './index.js';

const sharedInput = [
Expand All @@ -11,6 +11,14 @@ const sharedInput = [
[30, 100],
];

const longerSharedInput = [
[10, 300],
[20, 200],
[30, 100],
[40, 50],
[50, 25],
];

const errorInput1 = [
[20, 200],
[30, 100],
Expand Down Expand Up @@ -76,9 +84,9 @@ class ThrowingIterator {
index++;
this.index = index;
}
// eslint is wrong - bind is needed else the next() call cannot update
// this.index, which we need to track how many times the iterator was called
// eslint-disable-next-line no-extra-bind
// eslint is wrong - bind is needed else the next() call cannot update
// this.index, which we need to track how many times the iterator was called
// eslint-disable-next-line no-extra-bind
}).bind(this),
};
}
Expand All @@ -89,13 +97,13 @@ test('main', async t => {
t.deepEqual(await pMap(sharedInput, mapper), [10, 20, 30]);

// We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload.
t.true(inRange(end(), {start: 290, end: 430}));
assertInRange(t, end(), {start: 290, end: 430});
});

test('concurrency: 1', async t => {
const end = timeSpan();
t.deepEqual(await pMap(sharedInput, mapper, {concurrency: 1}), [10, 20, 30]);
t.true(inRange(end(), {start: 590, end: 760}));
assertInRange(t, end(), {start: 590, end: 760});
});

test('concurrency: 4', async t => {
Expand Down Expand Up @@ -225,13 +233,13 @@ test('asyncIterator - main', async t => {
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper), [10, 20, 30]);

// We give it some leeway on both sides of the expected 300ms as the exact value depends on the machine and workload.
t.true(inRange(end(), {start: 290, end: 430}));
assertInRange(t, end(), {start: 290, end: 430});
});

test('asyncIterator - concurrency: 1', async t => {
const end = timeSpan();
t.deepEqual(await pMap(new AsyncTestData(sharedInput), mapper, {concurrency: 1}), [10, 20, 30]);
t.true(inRange(end(), {start: 590, end: 760}));
assertInRange(t, end(), {start: 590, end: 760});
});

test('asyncIterator - concurrency: 4', async t => {
Expand Down Expand Up @@ -554,3 +562,28 @@ test('pMapIterable - stop on error', async t => {

t.deepEqual(output, [20]);
});

test('pMapIterable - concurrency', async t => {
const end = timeSpan();
t.deepEqual(await collectAsyncIterable(pMapIterable(sharedInput, mapper, {concurrency: 1})), [10, 20, 30]);

// It could've only taken this much time if each were run in series
assertInRange(t, end(), {start: 590, end: 760});
});

test('pMapIterable - backpressure', async t => {
let currentValue;

// Concurrency option is forced by an early check
const {value} = await pMapIterable(longerSharedInput, async value => {
currentValue = await mapper(value);
return currentValue;
}, {backpressure: 2, concurrency: 2})[Symbol.asyncIterator]().next();

t.is(value, 10);

// If backpressure is not respected, than all items will be evaluated in this time
await delay(600);

t.is(currentValue, 30);
});

0 comments on commit 63d99e0

Please sign in to comment.