Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add collect method #38975

Closed
wants to merge 3 commits into from
Closed

Conversation

bengl
Copy link
Member

@bengl bengl commented Jun 9, 2021

Returns a promise fulfilling with a sensible representation of the entire stream's data, depending on the stream type.

Buffer streams collect to a single contiguous Buffer. Buffer streams with encoding set collect to a single contiguous string. Object-mode streams collect to an array of objects.

Limiting is allowed, enabling situations like capping request body sizes, or only consuming a limited amount of process.stdin.

Ref: #35192
Ref: nodejs/tooling#68


Rationale

On the web, HTTP response objects have methods like arrayBuffer() and text() to get the full contents of the stream as a single item. In Node.js, tools like the body-parser middleware are often used for server IncomingMessage streams, and tools like concat-stream are used in other places. It would be remarkably handy for users if such a thing were built-in to the Readable prototype.

It's worth mentioning that the stage 2 iterator helpers proposal adds a toArray() method which reads an async iterator to completion in a similar way. That being said, in order to get, for example, a string from a readable stream of buffers, you'd still have to do something like

console.log(Buffer.concat(await readable.toArray()).toString('utf8'));

wheras this PR enables something more like this

readable.setEncoding('utf8');
console.log(await readable.collect());

or

console.log((await readable.collect()).toString());

without waiting for that proposal to be implemented in V8.

This enables one-liners as well. Consider the following one-liner to uppercase stdin.

cat myfile.txt | node -e "(async()=>{const x=[];for await (const c of process.stdin)x.push(c);console.log((''+Buffer.concat(x)).toUpperCase())})()"

With this PR, we can shorten this to the following.

cat myfile.txt | node -e "(async()=>{console.log((''+await process.stdin.collect()).toUpperCase())})()"

Why the name? Rust uses this name for similar functionality and it doesn't appear to clash with the iterator helpers proposal or other methods.

Potential Alternative

It might make sense to add this as a static method on Readable, or a helper function on util, like util.collect() that behaves the way the stream method does in this PR, but also attempting to act appropriately on arbitrary (async) iterables. This would effectively be a slightly more stream-friendly version of the upcoming toArray().

Help?

  • Im not sure the documentation is the best way to describe what's going on, so I'm hoping reviewers can chime in a bit there.
  • Are there edge-cases I'm missing in the test?
  • Most of the work is deferred to the async iteration implementation. Maybe there's a more efficient and/or resilient way to do this?

Returns a promise fulfilling with a sensible representation of the
entire stream's data, depending on the stream type.

Buffer streams collect to a single contiguous Buffer. Buffer streams
with encoding set collect to a single contiguous string. Object-mode
streams collect to an array of objects.

Limiting is allowed, enabling situations like capping request body
sizes, or only consuming a limited amount of process.stdin.

Ref: nodejs#35192
Ref: nodejs/tooling#68
@github-actions github-actions bot added errors Issues and PRs related to JavaScript errors originated in Node.js core. needs-ci PRs that need a full CI run. labels Jun 9, 2021
@lpinca lpinca added stream Issues and PRs related to the stream subsystem. and removed errors Issues and PRs related to JavaScript errors originated in Node.js core. labels Jun 9, 2021
@targos targos added the semver-minor PRs that contain new features and should be released in the next minor version. label Jun 9, 2021
@targos
Copy link
Member

targos commented Jun 9, 2021

Are there edge-cases I'm missing in the test?

What happens if you call collect(), don't await it, and then call collect() again, or pipe() on the stream?

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

Can we avoid rejecting? Is there a reason to reject rather than return the buffered amount?

@bengl
Copy link
Member Author

bengl commented Jun 9, 2021

@targos

In either case, the first collect() gets the full result, and any subsequent effort to get the data will give empty results. This makes sense to me. I'll add tests ensuring this, and document accordingly, also clarifying that this uses the async iterator internally.

@bmeck

It could just stop at/before the limit, but I think that would also require some kind of indication that the limit has been reached. Do you have a suggestion for how to give that info to calling code?

const encoding = isObjectMode ? false : this._readableState.encoding;
for await (const chunk of this) {
const chunkLen = isObjectMode ? 1 : chunk.length;
if (len + chunkLen > limit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this push back the chunk part that is past the limit for other consumers to use? e.g.

let buff = new TextEncoder().encode('Hello world!');
stream.write(buff);
await stream.collect(5); // ~ 'Hello'
await stream.collect(7); // ~ ' world!'

I think a test is fine, I'm not 100% familiar with the behavior in async iterator mode. Also, does this need to account for unicode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this push back the chunk part that is past the limit for other consumers to use?

Maybe in a non-rejecting implementation like we're discussing elsehwere in this PR, yeah. Otherwise, "other consumers" don't exist.

What you're doing in the example here seems more like it should happen as an option to .iterator().

e.g.

let buff = new TextEncoder().encode('Hello world!');
stream.write(buff);
let iter = stream.iterator({ limit: 5 })
(await iter.next()).value; // ~ 'Hello'
(await iter.next()).value; // ~ ' world!'

Or an async version of .read([size]).

Also, does this need to account for unicode?

I can add a test or two, but I think it doesn't matter? If encoding is set, then StringDecoder will do the needful behind the scenes, won't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bengl using a stream as a tee/buffered stream is my concern. There isn't a clear guarantee no other listeners are consuming on the same stream or will in the future even if we reject.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can optionally break out of the loop on the limit, allowing other listeners to get future chunks. This would be the rejectAtLimit: false option I propose below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so a slight difference is that you need to push back the unused portion of the buffers being read. So if you only partially read "hello world!" for example, the subsequent .collect() calls can get the remainder rather than having it be dropped.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be easy enough with an .unshift() before the break. I'll do that and add tests as soon as I have a bit of spare time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. and that was this evening. PTAL at the latest changes.

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

@bengl I'm not sure I understand the practical need to know if it is at the end in most cases. If the limit is infinity you know it is the end. I think for the majority of time you either want to drain the whole stream or limit the consumption, usually not both.

@bengl
Copy link
Member Author

bengl commented Jun 9, 2021

@bmeck

I think for the majority of time you either want to drain the whole stream or limit the consumption, usually not both.

The use case for limiting I had in mind is this comment and the subsequent comments. If you're limiting the amount of data because you can't/won't deal with large payloads, then you want to know when you've hit that limit so that you don't then proceed to try and decode it with the end missing.

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

@bengl right now we don't close past the limit in this PR, was it intended to close the stream so that it doesn't continuously fill past the limit even after we reject? If so that would be a more meaningful reason to reject at the limit by default. Perhaps an opt-out would be nice though.

@bengl
Copy link
Member Author

bengl commented Jun 9, 2021

@bmeck

Hmm, how about this:

// showing defaults
readable.collect({
  limit: Infinity,
  rejectAtLimit: true
})

Where rejectAtLimit: true calls destroy() and rejects at the limit, and false does neither of the two.

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

sounds good to me, particularly interested in things like reading stream contents headers and then only reading body as needed and/or piping a body to other places. If it does allow limits without rejecting I can do so.

@jasnell
Copy link
Member

jasnell commented Jun 9, 2021

Question... why not simply implement the same methods and behaviors as defined by the Body mixin?

Specifically...

await readable.arrayBuffer();   // collect that resolves as an ArrayBuffer
await readable.text();  // collect that resolves as a string
await readable.blob();  // collect that resolves as a blob

For object mode streams, the async iterator approach would seem to be fine (and ideal)

For some context... the new QUIC API that I'm working on now will support an API like the following...

// "stream" here refers to a QUIC Stream, not a require('stream') stream.
// Yes, confusing, but...
session.onstream = async ({ stream, respondWith }) => {

  console.log(await stream.text());
  // or await stream.arrayBuffer()
  // or await stream.blob()
  // or await stream.text()
  // or stream.readable() (no await because it returns a stream)

  respondWith({ body: 'Hello World' });
};

(Specifically, I want to avoid adding a new Node.js-only API when a similar construct already exists)

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

@jasnell I think in particular the ability to have a limit is a different API than those.

@bengl
Copy link
Member Author

bengl commented Jun 9, 2021

@jasnell

Why not both!

That's not a bad idea! That being said, there are differences between this implementation of .collect() and the Body mixin methods:

  1. Object-mode streams exist. What would happen if we call .arrayBuffer() or .text() (or .json() for that matter) on an object-mode stream? Sure, we could define some behaviour, but see the points below about the Body mixin.
  2. The Body mixin methods don't support Buffer. So long as Buffer is still in Node.js and streams support Buffer, having a method that gives us a Buffer rather than an 'ArrayBuffer` is pretty handy.
  3. I chose the name collect specifically to not collide with the Body mixin methods (or the iterator helper methods), so that they shouldn't be precluded by this method's existence, although see the point below.
  4. The QUIC stream in your example is not a Readable, but has a method that returns a Readable. Streams. from http/https, net, etc. don't have this pattern, so I'd be worried about the confusion between a stream and a handle-ish object that gives you a stream (or other stuff).

@jasnell
Copy link
Member

jasnell commented Jun 9, 2021

@bmeck:

I think in particular the ability to have a limit is a different API than those.

Yes. But is that limit necessary? Or, perhaps, is it the right way to handle it? This is difficult because I don't think the Body mixin handles this part well either.

If we're designing a new API for this anyway, what I'd almost prefer to see is a Progress + AbortController type approach...

e.g.

const ac = new AbortController();
const limit = 100;

const results = await readable.collect({
  signal: ac.signal,
  progress: (read) => if (read > limit) ac.abort(),
});

// Or, by timeout...

setTimeout(() => ac.abort(), 10000);

const results = await readable.collect({
  signal: ac.signal,
});

The key question I have tho is: are the requirements here sufficient enough that adding a new API is necessary or are the existing standardized APIs sufficient (even if less capable)?

@jasnell
Copy link
Member

jasnell commented Jun 9, 2021

@bengl:

Object-mode streams exist. What would happen if we call .arrayBuffer() or .text() (or .json() for that matter) on an object-mode stream? Sure, we could define some behaviour, but see the points below about the Body mixin.

I would expect those to return immediately rejected promises.

The Body mixin methods don't support Buffer. So long as Buffer is still in Node.js and streams support Buffer, having a method that gives us a Buffer rather than an 'ArrayBuffer` is pretty handy.

Eh, I can see the argument but personally I could take it or leave it. Even so, that could be done with a await readable.buffer(), which would fit with existing Body mixin naming pattern. Point is taken tho.

I chose the name collect specifically to not collide with the Body mixin methods (or the iterator helper methods), so that they shouldn't be precluded by this method's existence, although see the point below.
The QUIC stream in your example is not a Readable, but has a method that returns a Readable. Streams. from http/https, net, etc. don't have this pattern, so I'd be worried about the confusion between a stream and a handle-ish object that gives you a stream (or other stuff).

Fair points. If we decide that a new API is required, I'm certainly not opposed to collect() as an option.

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

If we're designing a new API for this anyway, what I'd almost prefer to see is a Progress + AbortController type approach...

This seems fine, but how would it differ from this with a {signal} option is my question.

@jasnell
Copy link
Member

jasnell commented Jun 9, 2021

@bmeck:

This seems fine, but how would it differ from this with a {signal} option is my question.

Only in that it gives more flexibility in how it's handled. I could choose to simply log the overage with a warning... or simply track the progress of reading.

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

Only in that it gives more flexibility in how it's handled. I could choose to simply log the overage with a warning... or simply track the progress of reading.

What does "track the progress" mean here? I'm confused on the API you are trying to discuss since I don't know its feature set exactly.

@jasnell
Copy link
Member

jasnell commented Jun 9, 2021

What does "track the progress" mean here? I'm confused on the API you are trying to discuss since I don't know its feature set exactly.

Literally something like...

console.log(await readable.collect({
  progress(read) {
    console.log(`${read} bytes read`);
  }
}));

// 10 bytes read
// 20 bytes read
// 30 bytes read
// Buffer < ... >

Again, I'm putting this up for sake of discussion. Is just having limit here the right choice for the use case.

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

I'm not really interested in if "is limit useful", but more in what features might be missing / useful. So far I see:

  • AbortSignal
    • Seems to be universal in wanting to cancel things, no questions here
  • Progress
    • Likely we don't want to emit events or something if missing since it would just add waste
    • When is it used except to support progress bars?

@jasnell
Copy link
Member

jasnell commented Jun 9, 2021

To be fair, "progress" is something that could be implemented externally to all this, simply by registering an on('data') handler and tracking that externally. My question more is, is limit needed at all? So long as we have a way of canceling using AbortSignal, can we drop that entirely? (that would also side step the discussion of what we should do with the extraneous chunk read above the limit)

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

can we drop that entirely? (that would also side step the discussion of what we should do with the extraneous chunk read above the limit)

I don't think so unless we expect aborts to also unshift the parts they don't need. For example, reading the first X bytes of a TAR is useful to grasp what headers are going on. I'm unclear on how we would do that workflow w/ aborts.

@bengl
Copy link
Member Author

bengl commented Jun 9, 2021

What cases using AbortController (and not limit) wouldn't be more readable by just using the async iterator directly?

@bmeck
Copy link
Member

bmeck commented Jun 9, 2021

I'm really excited about this method because it could allow things like:

let headers = tarStream.collect({limit: HEADERS});
let BODY_SIZE = headers.readUint32BE(_);
tarStream.collect({limit: BODY_SIZE});
// continue

Having the ability to run until delimiter could be useful but likely would be just difficult to agree upon what that means in this API at least.

@Qard
Copy link
Member

Qard commented Jun 9, 2021

The Body API could be implemented on top of this for instances where its underlying data source is a readable stream, so I don't see any reason the two can't coexist.

@bengl
Copy link
Member Author

bengl commented Jun 10, 2021

I implemented the rejectAtLimit option. In order for it to behave like @bmeck was asking, this required using the .iterator() method and setting its destroyOnReturn option to false. This begs the question of what to do with destroyOnError. For now, I'm leaving it as the default, but maybe that should be an option on .collect() that's passed down to .iterator()?

@jasnell I'm still thinking that non-limit use cases for AbortController would actually be better served by using the .iterator() method directly, mainly for readability. That being said, if there's a use case where it's more ergonomic for developers to use an AbortController-enabled API, I'd like to see what that actually looks like. Also, what that looks like when we don't want to destroy the stream at an abort situation.

@bmeck In terms of a delimiter: I like the idea, but I think you're right that it requires a lot of hashing out. I think that might be better off dealt with in a future PR.

@nodejs-github-bot
Copy link
Collaborator

const assert = require('assert');

const hello = Buffer.from('hello, ');
const world = Buffer.from('world');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include multibyte characters in the test to make sure those are collected properly? (they are, but to make sure there are no regressions later)

limit = Infinity,
rejectAtLimit = true
} = {}) {
const bufs = [];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs validation for the options object.... like...

Readable.prototype.collect = async function(options = {}) {
  validateObject(options, 'options');   // from internal/validators
  const {
    limit = Infinity,
    rejectAtLimit = true,
  } = options;
  validateNumber(limit, 'options.limit');
  validateBoolean(rejectAtLimit', 'options.rejectAtLimit');
  // ...
};

len += chunkLen;
bufs.push(chunk);
}
return isObjectMode ? bufs : encoding ? bufs.join('') : Buffer.concat(bufs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an outside chance that Buffer.concat(bufs) could attempt to produce a Buffer larger than buffer.kMaxLength, in which case Buffer.concat() will throw ..

RangeError [ERR_INVALID_ARG_VALUE]: The argument 'size' is invalid. Received 4294967297
    at Function.allocUnsafe (node:buffer:374:3)
    at Function.concat (node:buffer:553:25)
    // ...

The error is not very obvious, however, for someone who is just using collect() ...

Should we check the size before attempting to concat and throw a more specific error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.

What if limit is actually Math.min(limit, buffer.kMaxLength) and it's documented (and tested) as such?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the limit, I'd prefer we throw a range error instead of silently capping at buffer.kMaxLength (we should also throw if limit is negative or not an integer)

An exception is still possible if there isn't enough memory to allocate the buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work also, I think.

if (rejectAtLimit) {
throw new ERR_STREAM_COLLECT_LIMIT_EXCEEDED(len + chunk.length, limit);
} else {
this.unshift(chunk);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we certain the unshift() here is actually needed? (/cc @mcollina)

For instance, the following appears to work fine without it:

import { PassThrough } from 'node:stream';

const p = new PassThrough();

setTimeout(() => {
  p.write('hello');
  p.write('boom');
}, 10);

const a = p.collect({ limit: 2, rejectAtLimit: false });

p.pipe(process.stdout);

console.log(await a);

Generates the output: helloboom<Buffer > with or without the unshift()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems... bad? it fails to gather the 2 first chars in the Buffer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct and needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That actually seems correct. While the limit is a number of bytes, it's enforced at the chunk level. So if we have 2 chunks of length 5 and 4 respectively, then a limit of 2 would result in an empty buffer and a limit of 7 would result in only the first chunk being in the buffer. If there's a desire to have the limit be at the byte level for buffer streams or character level for string streams, that's not a huge change so I can go ahead and do that.

The "with or without the unshift()" might seem weird, but I think that might be a side-effect of piping before await the collect? I think it would be the same if you called iterator() instead of collect. You'd get nothing out of that iterator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused here, I thought limit would be "return once you have X number of things", it seems like it is being implemented as "return if you have more than X number of things?". I'm confused on what the use case of {limit: 2} is above; when would I use that?

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer a import { collect } from 'stream/promises' rather than a method added to Readable.

limit = Infinity,
rejectAtLimit = true
} = {}) {
const bufs = [];
Copy link
Member

@mcollina mcollina Jun 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use a linked list instead of an array.

More importantly we can have a more performant implementation if we accumulate as a string.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use a linked list instead of an array.

We'd then have to collapse that into the result. For object mode streams, that means an array, akin to the proposed %AsyncIterator.prototype%.toArray(). For Buffer streams we'd have to do some other manual concatenating. For Buffer streams with encoding set, see below.

Is that collapsing going to be an OK tradeoff for the benefit of using a linked list?

More importantly we can have a more performant implementation if we accumulate as a string.

Sure, if it's a Buffer stream with encoding set. That certainly doesn't apply to object mode streams, and I don't think it applies to Buffer streams without encoding set either?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why using a Linked List is better for this is that the result could be quite big and you'd have a LOT of memory copying to do to crease a single buffer with all the data. In essence, we'd need something like https://www.npmjs.com/package/bl to avoid all that extra performance cost.

Sure, if it's a Buffer stream with encoding set. That certainly doesn't apply to object mode streams, and I don't think it applies to Buffer streams without encoding set either?

This analysis is correct.

@bengl
Copy link
Member Author

bengl commented Jun 10, 2021

@mcollina

I would prefer a import { collect } from 'stream/promises' rather than a method added to Readable.

I don't think it belongs there. Here's the description from the docs: "The stream/promises API provides an alternative set of asynchronous utility functions for streams that return Promise objects rather than using callbacks."

As far as I can tell, the existing functions in there all have non-promise equivalents, and that seems to be the intent. This does not. I'd really prefer not to hide something away in a less ergonomic API when there's no real need. Previous needs have been for disambiguation, but I don't think there's anything ambiguous here.

@jasnell
Copy link
Member

jasnell commented Jun 10, 2021

I would prefer a import { collect } from 'stream/promises' rather than a method added to Readable.

I can see it both ways here. Extending Readable.prototype is difficult given the existence of readable-stream and Readables-that-don't-really-extend-from-stream.Readable.

Matteo's suggestion...

import { collect } from 'stream/promises';

const readable = fs.createReadStream('...');

await collect(readable, { /** ... options **/ });

Makes it possible for us to support this mechanism for all Readable streams that support async iterator but do not yet extend from a stream.Readable base class that has collect().

@mcollina
Copy link
Member

Thanks @jasnell. I'm ok for this to live on the 'stream' module as well.

Copy link
Member

@bmeck bmeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confusing limit parameter needs some rewording/docs on when to use it / gotcha in my comment seems too easy to do and still don't understand behavior after a bit of explanation.

@bengl
Copy link
Member Author

bengl commented Jun 11, 2021

The more I think about it, the more I think limit is confusing and may be better left to a different method/helper-method/userland. I'm warming up to the AbortSignal (without progress) approach, and with the switch to making it a helper function in a separate place (via @mcollina's suggestion and @jasnell's clarification), not assuming that there's a stream underlying the iterable. That means leaving any unshifting to the user, which I think is probably fine for now, so long as the abort behavior is well-documented. @bmeck does this sound okay to you?

The other alternative is just implementing Body like @jasnell suggested, and letting .arrayBuffer() exist as-is, so folks can get a Buffer from that if they want. I see 3 options for implementing this:

  1. Directly on Readable.prototype. This would suffer from the confusing fact that readable.readable() === readable, but it might be an okay compromise.
  2. A Body object returned from a method like .body(). Fixes the confusion above, but adds an extra step of inderection (which might not be a big deal.
  3. util.bodyFromIterator() or some similar helper function elsewhere. Keeps everything out of readable-stream's way, and decouples the concept from exclusively streams.

The obvious downside is that the Body mixin currently has no way of aborting these operations. Seeing as we'd already have to deviate from the spec by having .readable() return a Node.js Readable, maybe it's fine to also give these methods a signal option?

I'm not sure which approach is best here. What do folks think?

@bmeck
Copy link
Member

bmeck commented Jun 11, 2021

I'd strongly prefer probably block trying to land WHATWG Body w/ .readable returning a Node Readable.

I do think if this is just a loop to collect data and concat it together it likely shouldn't be on stream in any fashion directly. Having it on util seems reasonable.

Per limit, I'm a bit torn as without limit you have over-reads and potentially problems like infinite payloads. I would like some sort of mitigation here if we do implement our own API rather than a more compatible WHATWG API. If we don't have limit I would prefer we just implement something that takes an async iterable and returns a subset of the Body mixin as I don't see the direct merit creating custom APIs if they have such a high overlap of features and no expansion in utility. One nice thing about Body is you can know what type of thing you get out of the results without knowing the current encoding a stream is using.

@jimmywarting
Copy link

jimmywarting commented Aug 20, 2021

I would prefer it if it didn't return a Buffer.
Buffer is a nodejs only feature that don't fit well into Deno or Browser, something better would be to return a arrayBuffer or a Uint8Array, if developers rely wants a buffer then they can upgrade it to a Buffer with Buffer.from

This feature seems a bit unnecessary now when we shipped consumers

@jimmywarting
Copy link

An alternative solution could be what i suggested to expressjs
expressjs/express#4536

Adding blob(), text() & arrayBuffer() to IncomingRequest to make incoming request behave/look more like a service worker

@jasnell
Copy link
Member

jasnell commented Aug 20, 2021

Take a look at the new stream/consumers module. It adds arrayBuffer(), blob(), buffer(), text(), and json() utilities.

You can easily do await text(process.stdin) for instance to consume the input.

@bengl
Copy link
Member Author

bengl commented Aug 20, 2021

Yes, consumers makes this unnecessary. Closing.

@bengl bengl closed this Aug 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-ci PRs that need a full CI run. semver-minor PRs that contain new features and should be released in the next minor version. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants