-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add collect method #38975
stream: add collect method #38975
Conversation
Returns a promise fulfilling with a sensible representation of the entire stream's data, depending on the stream type. Buffer streams collect to a single contiguous Buffer. Buffer streams with encoding set collect to a single contiguous string. Object-mode streams collect to an array of objects. Limiting is allowed, enabling situations like capping request body sizes, or only consuming a limited amount of process.stdin. Ref: nodejs#35192 Ref: nodejs/tooling#68
93acce0
to
ddcfdea
Compare
ddcfdea
to
b2aa2ef
Compare
What happens if you call |
Can we avoid rejecting? Is there a reason to reject rather than return the buffered amount? |
In either case, the first It could just stop at/before the limit, but I think that would also require some kind of indication that the limit has been reached. Do you have a suggestion for how to give that info to calling code? |
const encoding = isObjectMode ? false : this._readableState.encoding; | ||
for await (const chunk of this) { | ||
const chunkLen = isObjectMode ? 1 : chunk.length; | ||
if (len + chunkLen > limit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this push back the chunk part that is past the limit for other consumers to use? e.g.
let buff = new TextEncoder().encode('Hello world!');
stream.write(buff);
await stream.collect(5); // ~ 'Hello'
await stream.collect(7); // ~ ' world!'
I think a test is fine, I'm not 100% familiar with the behavior in async iterator mode. Also, does this need to account for unicode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this push back the chunk part that is past the limit for other consumers to use?
Maybe in a non-rejecting implementation like we're discussing elsehwere in this PR, yeah. Otherwise, "other consumers" don't exist.
What you're doing in the example here seems more like it should happen as an option to .iterator()
.
e.g.
let buff = new TextEncoder().encode('Hello world!');
stream.write(buff);
let iter = stream.iterator({ limit: 5 })
(await iter.next()).value; // ~ 'Hello'
(await iter.next()).value; // ~ ' world!'
Or an async version of .read([size])
.
Also, does this need to account for unicode?
I can add a test or two, but I think it doesn't matter? If encoding is set, then StringDecoder
will do the needful behind the scenes, won't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bengl using a stream as a tee/buffered stream is my concern. There isn't a clear guarantee no other listeners are consuming on the same stream or will in the future even if we reject.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can optionally break out of the loop on the limit, allowing other listeners to get future chunks. This would be the rejectAtLimit: false
option I propose below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so a slight difference is that you need to push back the unused portion of the buffers being read. So if you only partially read "hello world!" for example, the subsequent .collect()
calls can get the remainder rather than having it be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be easy enough with an .unshift()
before the break
. I'll do that and add tests as soon as I have a bit of spare time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. and that was this evening. PTAL at the latest changes.
@bengl I'm not sure I understand the practical need to know if it is at the end in most cases. If the limit is infinity you know it is the end. I think for the majority of time you either want to drain the whole stream or limit the consumption, usually not both. |
The use case for limiting I had in mind is this comment and the subsequent comments. If you're limiting the amount of data because you can't/won't deal with large payloads, then you want to know when you've hit that limit so that you don't then proceed to try and decode it with the end missing. |
@bengl right now we don't close past the limit in this PR, was it intended to close the stream so that it doesn't continuously fill past the limit even after we reject? If so that would be a more meaningful reason to reject at the limit by default. Perhaps an opt-out would be nice though. |
Hmm, how about this: // showing defaults
readable.collect({
limit: Infinity,
rejectAtLimit: true
}) Where |
sounds good to me, particularly interested in things like reading stream contents headers and then only reading body as needed and/or piping a body to other places. If it does allow limits without rejecting I can do so. |
Question... why not simply implement the same methods and behaviors as defined by the Body mixin? Specifically... await readable.arrayBuffer(); // collect that resolves as an ArrayBuffer
await readable.text(); // collect that resolves as a string
await readable.blob(); // collect that resolves as a blob For object mode streams, the async iterator approach would seem to be fine (and ideal) For some context... the new QUIC API that I'm working on now will support an API like the following... // "stream" here refers to a QUIC Stream, not a require('stream') stream.
// Yes, confusing, but...
session.onstream = async ({ stream, respondWith }) => {
console.log(await stream.text());
// or await stream.arrayBuffer()
// or await stream.blob()
// or await stream.text()
// or stream.readable() (no await because it returns a stream)
respondWith({ body: 'Hello World' });
}; (Specifically, I want to avoid adding a new Node.js-only API when a similar construct already exists) |
@jasnell I think in particular the ability to have a limit is a different API than those. |
Why not both! That's not a bad idea! That being said, there are differences between this implementation of
|
Yes. But is that limit necessary? Or, perhaps, is it the right way to handle it? This is difficult because I don't think the If we're designing a new API for this anyway, what I'd almost prefer to see is a Progress + e.g. const ac = new AbortController();
const limit = 100;
const results = await readable.collect({
signal: ac.signal,
progress: (read) => if (read > limit) ac.abort(),
});
// Or, by timeout...
setTimeout(() => ac.abort(), 10000);
const results = await readable.collect({
signal: ac.signal,
}); The key question I have tho is: are the requirements here sufficient enough that adding a new API is necessary or are the existing standardized APIs sufficient (even if less capable)? |
I would expect those to return immediately rejected promises.
Eh, I can see the argument but personally I could take it or leave it. Even so, that could be done with a
Fair points. If we decide that a new API is required, I'm certainly not opposed to |
This seems fine, but how would it differ from this with a {signal} option is my question. |
Only in that it gives more flexibility in how it's handled. I could choose to simply log the overage with a warning... or simply track the progress of reading. |
What does "track the progress" mean here? I'm confused on the API you are trying to discuss since I don't know its feature set exactly. |
Literally something like... console.log(await readable.collect({
progress(read) {
console.log(`${read} bytes read`);
}
}));
// 10 bytes read
// 20 bytes read
// 30 bytes read
// Buffer < ... > Again, I'm putting this up for sake of discussion. Is just having |
I'm not really interested in if "is limit useful", but more in what features might be missing / useful. So far I see:
|
To be fair, "progress" is something that could be implemented externally to all this, simply by registering an |
I don't think so unless we expect aborts to also unshift the parts they don't need. For example, reading the first X bytes of a TAR is useful to grasp what headers are going on. I'm unclear on how we would do that workflow w/ aborts. |
What cases using |
I'm really excited about this method because it could allow things like: let headers = tarStream.collect({limit: HEADERS});
let BODY_SIZE = headers.readUint32BE(_);
tarStream.collect({limit: BODY_SIZE});
// continue Having the ability to run until delimiter could be useful but likely would be just difficult to agree upon what that means in this API at least. |
The |
I implemented the @jasnell I'm still thinking that non-limit use cases for @bmeck In terms of a delimiter: I like the idea, but I think you're right that it requires a lot of hashing out. I think that might be better off dealt with in a future PR. |
const assert = require('assert'); | ||
|
||
const hello = Buffer.from('hello, '); | ||
const world = Buffer.from('world'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you include multibyte characters in the test to make sure those are collected properly? (they are, but to make sure there are no regressions later)
limit = Infinity, | ||
rejectAtLimit = true | ||
} = {}) { | ||
const bufs = []; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs validation for the options object.... like...
Readable.prototype.collect = async function(options = {}) {
validateObject(options, 'options'); // from internal/validators
const {
limit = Infinity,
rejectAtLimit = true,
} = options;
validateNumber(limit, 'options.limit');
validateBoolean(rejectAtLimit', 'options.rejectAtLimit');
// ...
};
len += chunkLen; | ||
bufs.push(chunk); | ||
} | ||
return isObjectMode ? bufs : encoding ? bufs.join('') : Buffer.concat(bufs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an outside chance that Buffer.concat(bufs)
could attempt to produce a Buffer larger than buffer.kMaxLength
, in which case Buffer.concat()
will throw ..
RangeError [ERR_INVALID_ARG_VALUE]: The argument 'size' is invalid. Received 4294967297
at Function.allocUnsafe (node:buffer:374:3)
at Function.concat (node:buffer:553:25)
// ...
The error is not very obvious, however, for someone who is just using collect()
...
Should we check the size before attempting to concat and throw a more specific error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.
What if limit
is actually Math.min(limit, buffer.kMaxLength)
and it's documented (and tested) as such?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the limit, I'd prefer we throw a range error instead of silently capping at buffer.kMaxLength
(we should also throw if limit is negative or not an integer)
An exception is still possible if there isn't enough memory to allocate the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would work also, I think.
if (rejectAtLimit) { | ||
throw new ERR_STREAM_COLLECT_LIMIT_EXCEEDED(len + chunk.length, limit); | ||
} else { | ||
this.unshift(chunk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we certain the unshift()
here is actually needed? (/cc @mcollina)
For instance, the following appears to work fine without it:
import { PassThrough } from 'node:stream';
const p = new PassThrough();
setTimeout(() => {
p.write('hello');
p.write('boom');
}, 10);
const a = p.collect({ limit: 2, rejectAtLimit: false });
p.pipe(process.stdout);
console.log(await a);
Generates the output: helloboom<Buffer >
with or without the unshift()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems... bad? it fails to gather the 2 first chars in the Buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is correct and needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That actually seems correct. While the limit is a number of bytes, it's enforced at the chunk level. So if we have 2 chunks of length 5 and 4 respectively, then a limit of 2 would result in an empty buffer and a limit of 7 would result in only the first chunk being in the buffer. If there's a desire to have the limit be at the byte level for buffer streams or character level for string streams, that's not a huge change so I can go ahead and do that.
The "with or without the unshift()
" might seem weird, but I think that might be a side-effect of piping before await the collect? I think it would be the same if you called iterator()
instead of collect. You'd get nothing out of that iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused here, I thought limit would be "return once you have X number of things", it seems like it is being implemented as "return if you have more than X number of things?". I'm confused on what the use case of {limit: 2}
is above; when would I use that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer a import { collect } from 'stream/promises'
rather than a method added to Readable.
limit = Infinity, | ||
rejectAtLimit = true | ||
} = {}) { | ||
const bufs = []; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use a linked list instead of an array.
More importantly we can have a more performant implementation if we accumulate as a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use a linked list instead of an array.
We'd then have to collapse that into the result. For object mode streams, that means an array, akin to the proposed %AsyncIterator.prototype%.toArray()
. For Buffer streams we'd have to do some other manual concatenating. For Buffer streams with encoding set, see below.
Is that collapsing going to be an OK tradeoff for the benefit of using a linked list?
More importantly we can have a more performant implementation if we accumulate as a string.
Sure, if it's a Buffer stream with encoding set. That certainly doesn't apply to object mode streams, and I don't think it applies to Buffer streams without encoding set either?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why using a Linked List is better for this is that the result could be quite big and you'd have a LOT of memory copying to do to crease a single buffer with all the data. In essence, we'd need something like https://www.npmjs.com/package/bl to avoid all that extra performance cost.
Sure, if it's a Buffer stream with encoding set. That certainly doesn't apply to object mode streams, and I don't think it applies to Buffer streams without encoding set either?
This analysis is correct.
I don't think it belongs there. Here's the description from the docs: "The As far as I can tell, the existing functions in there all have non-promise equivalents, and that seems to be the intent. This does not. I'd really prefer not to hide something away in a less ergonomic API when there's no real need. Previous needs have been for disambiguation, but I don't think there's anything ambiguous here. |
I can see it both ways here. Extending Matteo's suggestion... import { collect } from 'stream/promises';
const readable = fs.createReadStream('...');
await collect(readable, { /** ... options **/ }); Makes it possible for us to support this mechanism for all Readable streams that support async iterator but do not yet extend from a |
Thanks @jasnell. I'm ok for this to live on the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confusing limit
parameter needs some rewording/docs on when to use it / gotcha in my comment seems too easy to do and still don't understand behavior after a bit of explanation.
The more I think about it, the more I think The other alternative is just implementing
The obvious downside is that the I'm not sure which approach is best here. What do folks think? |
I'd strongly prefer probably block trying to land WHATWG I do think if this is just a loop to collect data and concat it together it likely shouldn't be on Per |
I would prefer it if it didn't return a Buffer. This feature seems a bit unnecessary now when we shipped consumers |
An alternative solution could be what i suggested to expressjs Adding blob(), text() & arrayBuffer() to IncomingRequest to make incoming request behave/look more like a service worker |
Take a look at the new stream/consumers module. It adds arrayBuffer(), blob(), buffer(), text(), and json() utilities. You can easily do |
Yes, consumers makes this unnecessary. Closing. |
Returns a promise fulfilling with a sensible representation of the entire stream's data, depending on the stream type.
Buffer streams collect to a single contiguous Buffer. Buffer streams with encoding set collect to a single contiguous string. Object-mode streams collect to an array of objects.
Limiting is allowed, enabling situations like capping request body sizes, or only consuming a limited amount of process.stdin.
Ref: #35192
Ref: nodejs/tooling#68
Rationale
On the web, HTTP response objects have methods like
arrayBuffer()
andtext()
to get the full contents of the stream as a single item. In Node.js, tools like thebody-parser
middleware are often used for serverIncomingMessage
streams, and tools likeconcat-stream
are used in other places. It would be remarkably handy for users if such a thing were built-in to theReadable
prototype.It's worth mentioning that the stage 2 iterator helpers proposal adds a
toArray()
method which reads an async iterator to completion in a similar way. That being said, in order to get, for example, a string from a readable stream of buffers, you'd still have to do something likewheras this PR enables something more like this
or
without waiting for that proposal to be implemented in V8.
This enables one-liners as well. Consider the following one-liner to uppercase stdin.
With this PR, we can shorten this to the following.
Why the name? Rust uses this name for similar functionality and it doesn't appear to clash with the iterator helpers proposal or other methods.
Potential Alternative
It might make sense to add this as a static method on
Readable
, or a helper function onutil
, likeutil.collect()
that behaves the way the stream method does in this PR, but also attempting to act appropriately on arbitrary (async) iterables. This would effectively be a slightly more stream-friendly version of the upcomingtoArray()
.Help?