-
-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
eb4e1aa
commit ebbad60
Showing
7 changed files
with
84 additions
and
159 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,135 +1,53 @@ | ||
import from2 from 'from2'; | ||
import isPromise from 'p-is-promise'; | ||
import {Readable as ReadableStream} from 'node:stream'; | ||
import {Buffer} from 'node:buffer'; | ||
|
||
export default function intoStream(input) { | ||
if (Array.isArray(input)) { | ||
input = input.slice(); | ||
} | ||
|
||
let promise; | ||
let iterator; | ||
let asyncIterator; | ||
function baseIntoStream(isObjectMode, input) { | ||
async function * reader() { | ||
let value = await input; | ||
|
||
prepare(input); | ||
if (!value) { | ||
return; | ||
} | ||
|
||
function prepare(value) { | ||
input = value; | ||
if (Array.isArray(value)) { | ||
value = [...value]; | ||
} | ||
|
||
if ( | ||
input instanceof ArrayBuffer || | ||
(ArrayBuffer.isView(input) && !Buffer.isBuffer(input)) | ||
!isObjectMode | ||
&& ( | ||
value instanceof ArrayBuffer | ||
|| (ArrayBuffer.isView(value) && !Buffer.isBuffer(value)) | ||
) | ||
) { | ||
input = Buffer.from(input); | ||
value = Buffer.from(value); | ||
} | ||
|
||
promise = isPromise(input) ? input : null; | ||
|
||
// We don't iterate on strings and buffers since slicing them is ~7x faster | ||
const shouldIterate = !promise && input[Symbol.iterator] && typeof input !== 'string' && !Buffer.isBuffer(input); | ||
iterator = shouldIterate ? input[Symbol.iterator]() : null; | ||
|
||
const shouldAsyncIterate = !promise && input[Symbol.asyncIterator]; | ||
asyncIterator = shouldAsyncIterate ? input[Symbol.asyncIterator]() : null; | ||
} | ||
|
||
return from2(function reader(size, callback) { | ||
if (promise) { | ||
(async () => { | ||
try { | ||
await prepare(await promise); | ||
reader.call(this, size, callback); | ||
} catch (error) { | ||
callback(error); | ||
} | ||
})(); | ||
|
||
return; | ||
} | ||
// We don't iterate on strings and buffers since yielding them is ~7x faster. | ||
if (typeof value !== 'string' && !Buffer.isBuffer(value) && value?.[Symbol.iterator]) { | ||
for (const element of value) { | ||
yield element; | ||
} | ||
|
||
if (iterator) { | ||
const object = iterator.next(); | ||
setImmediate(callback, null, object.done ? null : object.value); | ||
return; | ||
} | ||
|
||
if (asyncIterator) { | ||
(async () => { | ||
try { | ||
const object = await asyncIterator.next(); | ||
setImmediate(callback, null, object.done ? null : object.value); | ||
} catch (error) { | ||
setImmediate(callback, error); | ||
} | ||
})(); | ||
if (value?.[Symbol.asyncIterator]) { | ||
for await (const element of value) { | ||
yield await element; | ||
} | ||
|
||
return; | ||
} | ||
|
||
if (input.length === 0) { | ||
setImmediate(callback, null, null); | ||
return; | ||
} | ||
|
||
const chunk = input.slice(0, size); | ||
input = input.slice(size); | ||
|
||
setImmediate(callback, null, chunk); | ||
}); | ||
} | ||
|
||
intoStream.object = input => { | ||
if (Array.isArray(input)) { | ||
input = input.slice(); | ||
} | ||
|
||
let promise; | ||
let iterator; | ||
let asyncIterator; | ||
|
||
prepare(input); | ||
|
||
function prepare(value) { | ||
input = value; | ||
promise = isPromise(input) ? input : null; | ||
iterator = !promise && input[Symbol.iterator] ? input[Symbol.iterator]() : null; | ||
asyncIterator = !promise && input[Symbol.asyncIterator] ? input[Symbol.asyncIterator]() : null; | ||
yield value; | ||
} | ||
|
||
return from2.obj(function reader(size, callback) { | ||
if (promise) { | ||
(async () => { | ||
try { | ||
await prepare(await promise); | ||
reader.call(this, size, callback); | ||
} catch (error) { | ||
callback(error); | ||
} | ||
})(); | ||
|
||
return; | ||
} | ||
|
||
if (iterator) { | ||
const object = iterator.next(); | ||
setImmediate(callback, null, object.done ? null : object.value); | ||
return; | ||
} | ||
|
||
if (asyncIterator) { | ||
(async () => { | ||
try { | ||
const object = await asyncIterator.next(); | ||
setImmediate(callback, null, object.done ? null : object.value); | ||
} catch (error) { | ||
setImmediate(callback, error); | ||
} | ||
})(); | ||
return ReadableStream.from(reader(), {objectMode: isObjectMode}); | ||
} | ||
|
||
return; | ||
} | ||
const intoStream = baseIntoStream.bind(undefined, false); | ||
|
||
this.push(input); | ||
export default intoStream; | ||
|
||
setImmediate(callback, null, null); | ||
}); | ||
}; | ||
intoStream.object = baseIntoStream.bind(undefined, true); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.