Skip to content

Commit

Permalink
Require Node.js 16
Browse files Browse the repository at this point in the history
  • Loading branch information
sindresorhus committed May 26, 2023
1 parent eb4e1aa commit ebbad60
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 159 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ jobs:
fail-fast: false
matrix:
node-version:
- 14
- 12
- 20
- 18
- 16
steps:
- uses: actions/checkout@v2
- uses: actions/setup-node@v2
- uses: actions/checkout@v3
- uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
- run: npm install
Expand Down
7 changes: 4 additions & 3 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {Readable as ReadableStream} from 'stream';
import {type Readable as ReadableStream} from 'node:stream';
import {type Buffer} from 'node:buffer';

export type Input =
| Buffer
Expand All @@ -20,7 +21,7 @@ declare const intoStream: {
Convert `input` into a stream. Adheres to the requested chunk size, except for `array` where each element will be a chunk.
@param input - The input to convert to a stream.
@returns A [readable stream](https://nodejs.org/api/stream.html#stream_class_stream_readable).
@returns A [readable stream](https://nodejs.org/api/stream.html#class-streamreadable).
@example
```
Expand All @@ -36,7 +37,7 @@ declare const intoStream: {
Convert object `input` into a stream.
@param input - The object input to convert to a stream.
@returns A [readable object stream](https://nodejs.org/api/stream.html#stream_object_mode).
@returns A [readable object stream](https://nodejs.org/api/stream.html#object-mode).
*/
object: (input: ObjectInput | Promise<ObjectInput>) => ReadableStream;
};
Expand Down
146 changes: 32 additions & 114 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,135 +1,53 @@
import from2 from 'from2';
import isPromise from 'p-is-promise';
import {Readable as ReadableStream} from 'node:stream';
import {Buffer} from 'node:buffer';

export default function intoStream(input) {
if (Array.isArray(input)) {
input = input.slice();
}

let promise;
let iterator;
let asyncIterator;
function baseIntoStream(isObjectMode, input) {
async function * reader() {
let value = await input;

prepare(input);
if (!value) {
return;
}

function prepare(value) {
input = value;
if (Array.isArray(value)) {
value = [...value];
}

if (
input instanceof ArrayBuffer ||
(ArrayBuffer.isView(input) && !Buffer.isBuffer(input))
!isObjectMode
&& (
value instanceof ArrayBuffer
|| (ArrayBuffer.isView(value) && !Buffer.isBuffer(value))
)
) {
input = Buffer.from(input);
value = Buffer.from(value);
}

promise = isPromise(input) ? input : null;

// We don't iterate on strings and buffers since slicing them is ~7x faster
const shouldIterate = !promise && input[Symbol.iterator] && typeof input !== 'string' && !Buffer.isBuffer(input);
iterator = shouldIterate ? input[Symbol.iterator]() : null;

const shouldAsyncIterate = !promise && input[Symbol.asyncIterator];
asyncIterator = shouldAsyncIterate ? input[Symbol.asyncIterator]() : null;
}

return from2(function reader(size, callback) {
if (promise) {
(async () => {
try {
await prepare(await promise);
reader.call(this, size, callback);
} catch (error) {
callback(error);
}
})();

return;
}
// We don't iterate on strings and buffers since yielding them is ~7x faster.
if (typeof value !== 'string' && !Buffer.isBuffer(value) && value?.[Symbol.iterator]) {
for (const element of value) {
yield element;
}

if (iterator) {
const object = iterator.next();
setImmediate(callback, null, object.done ? null : object.value);
return;
}

if (asyncIterator) {
(async () => {
try {
const object = await asyncIterator.next();
setImmediate(callback, null, object.done ? null : object.value);
} catch (error) {
setImmediate(callback, error);
}
})();
if (value?.[Symbol.asyncIterator]) {
for await (const element of value) {
yield await element;
}

return;
}

if (input.length === 0) {
setImmediate(callback, null, null);
return;
}

const chunk = input.slice(0, size);
input = input.slice(size);

setImmediate(callback, null, chunk);
});
}

intoStream.object = input => {
if (Array.isArray(input)) {
input = input.slice();
}

let promise;
let iterator;
let asyncIterator;

prepare(input);

function prepare(value) {
input = value;
promise = isPromise(input) ? input : null;
iterator = !promise && input[Symbol.iterator] ? input[Symbol.iterator]() : null;
asyncIterator = !promise && input[Symbol.asyncIterator] ? input[Symbol.asyncIterator]() : null;
yield value;
}

return from2.obj(function reader(size, callback) {
if (promise) {
(async () => {
try {
await prepare(await promise);
reader.call(this, size, callback);
} catch (error) {
callback(error);
}
})();

return;
}

if (iterator) {
const object = iterator.next();
setImmediate(callback, null, object.done ? null : object.value);
return;
}

if (asyncIterator) {
(async () => {
try {
const object = await asyncIterator.next();
setImmediate(callback, null, object.done ? null : object.value);
} catch (error) {
setImmediate(callback, error);
}
})();
return ReadableStream.from(reader(), {objectMode: isObjectMode});
}

return;
}
const intoStream = baseIntoStream.bind(undefined, false);

this.push(input);
export default intoStream;

setImmediate(callback, null, null);
});
};
intoStream.object = baseIntoStream.bind(undefined, true);
10 changes: 6 additions & 4 deletions index.test-d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import process from 'node:process';
import {Buffer} from 'node:buffer';
import intoStream from './index.js';

const unicornArray = 'unicorn'.split('');
const unicornArray = [...'unicorn'];

function asyncGeneratorFrom<T>(array: T[]) {
return async function * () {
Expand All @@ -10,7 +12,7 @@ function asyncGeneratorFrom<T>(array: T[]) {

function asyncIterableFrom<T>(array: T[]) {
return {
[Symbol.asyncIterator]: asyncGeneratorFrom(array)
[Symbol.asyncIterator]: asyncGeneratorFrom(array),
};
}

Expand All @@ -26,12 +28,12 @@ intoStream(Promise.resolve('unicorn')).pipe(process.stdout);
intoStream(Promise.resolve(unicornArray)).pipe(process.stdout);
intoStream(Promise.resolve(new Set(unicornArray))).pipe(process.stdout);
intoStream(Promise.resolve(new Set([Buffer.from('unicorn')]))).pipe(
process.stdout
process.stdout,
);
intoStream(Promise.resolve(Buffer.from('unicorn'))).pipe(process.stdout);
intoStream(Promise.resolve(Buffer.from('unicorn').buffer)).pipe(process.stdout);
intoStream(Promise.resolve(new Uint8Array(Buffer.from('unicorn').buffer))).pipe(
process.stdout
process.stdout,
);

intoStream(asyncGeneratorFrom(unicornArray)()).pipe(process.stdout);
Expand Down
15 changes: 5 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"type": "module",
"exports": "./index.js",
"engines": {
"node": ">=12"
"node": ">=16"
},
"scripts": {
"test": "xo && ava && tsd"
Expand Down Expand Up @@ -43,16 +43,11 @@
"gulpfriendly",
"value"
],
"dependencies": {
"from2": "^2.3.0",
"p-is-promise": "^3.0.0"
},
"devDependencies": {
"ava": "^3.15.0",
"ava": "^5.3.0",
"get-stream": "^6.0.1",
"p-event": "^4.2.0",
"p-immediate": "^4.0.0",
"tsd": "^0.14.0",
"xo": "^0.38.2"
"p-event": "^5.0.1",
"tsd": "^0.28.1",
"xo": "^0.54.2"
}
}
12 changes: 5 additions & 7 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

> Convert a string/promise/array/iterable/asynciterable/buffer/typedarray/arraybuffer/object into a stream
Correctly chunks up the input and handles backpressure.
Correctly handles backpressure.

## Install

```
$ npm install into-stream
```sh
npm install into-stream
```

## Usage
Expand All @@ -24,14 +24,12 @@ intoStream('unicorn').pipe(process.stdout);
### intoStream(input)

Type: `Buffer | TypedArray | ArrayBuffer | string | Iterable<Buffer | string> | AsyncIterable<Buffer | string> | Promise`\
Returns: [Readable stream](https://nodejs.org/api/stream.html#stream_class_stream_readable)

Adheres to the requested chunk size, except for `array` where each element will be a chunk.
Returns: [Readable stream](https://nodejs.org/api/stream.html#class-streamreadable)

### intoStream.object(input)

Type: `object | Iterable<object> | AsyncIterable<object> | Promise`\
Returns: [Readable object stream](https://nodejs.org/api/stream.html#stream_object_mode)
Returns: [Readable object stream](https://nodejs.org/api/stream.html#object-mode)

## Related

Expand Down

0 comments on commit ebbad60

Please sign in to comment.