Skip to content

Commit

Permalink
stream: add collect method
Browse files Browse the repository at this point in the history
Returns a promise fulfilling with a sensible representation of the
entire stream's data, depending on the stream type.

Buffer streams collect to a single contiguous Buffer. Buffer streams
with encoding set collect to a single contiguous string. Object-mode
streams collect to an array of objects.

Limiting is allowed, enabling situations like capping request body
sizes, or only consuming a limited amount of process.stdin.

Ref: nodejs#35192
Ref: nodejs/tooling#68
  • Loading branch information
bengl committed Jun 9, 2021
1 parent 306a57d commit 8dc2246
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 0 deletions.
16 changes: 16 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,22 @@ added: v0.9.4
The `'resume'` event is emitted when [`stream.resume()`][stream-resume] is
called and `readableFlowing` is not `true`.

#### `readable.collect([limit])`
<!-- YAML
added: REPLACEME
-->

* `limit` {Number} maximum number of bytes to collect. In object streams, this
is the maximum number of items to collect. **Default:** Infinity.
* Returns: {Promise} Fulfills with all the data from the stream, in the form of:
* If `readable` is a `Buffer` stream with encoding set, then a string.
* If `readable` is a `Buffer` stream with encoding unset, then a `Buffer`.
* If `readable` is an object mode stream, then an `Array` of objects.

Reads the stream to its `'end'`, collecting all the data in order.

If the `limit` is reached, the promise is rejected with an error.

##### `readable.destroy([error])`
<!-- YAML
added: v8.0.0
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,8 @@ E('ERR_STREAM_ALREADY_FINISHED',
'Cannot call %s after a stream was finished',
Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_COLLECT_LIMIT_EXCEEDED', 'Stream data length %s is beyond limit %s',
RangeError);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
Expand Down
17 changes: 17 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const {

const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_COLLECT_LIMIT_EXCEEDED,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
Expand Down Expand Up @@ -1057,6 +1058,22 @@ Readable.prototype.wrap = function(stream) {
return this;
};

Readable.prototype.collect = async function(limit = Infinity) {
const bufs = [];
let len = 0;
const isObjectMode = this._readableState.objectMode;
const encoding = isObjectMode ? false : this._readableState.encoding;
for await (const chunk of this) {
const chunkLen = isObjectMode ? 1 : chunk.length;
if (len + chunkLen > limit) {
throw new ERR_STREAM_COLLECT_LIMIT_EXCEEDED(len + chunk.length, limit);
}
len += chunkLen;
bufs.push(chunk);
}
return isObjectMode ? bufs : encoding ? bufs.join('') : Buffer.concat(bufs);
};

Readable.prototype[SymbolAsyncIterator] = function() {
return streamToAsyncIterator(this);
};
Expand Down
127 changes: 127 additions & 0 deletions test/parallel/test-stream-readable-collect.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
'use strict';

const common = require('../common');

const { Readable } = require('stream');
const assert = require('assert');

const hello = Buffer.from('hello, ');
const world = Buffer.from('world');

async function tests() {
{ // buffer, no limit
const r = new Readable();

r.push(hello);
r.push(world);
r.push(null);

const result = await r.collect();

assert.strictEqual(result.compare(Buffer.from('hello, world')), 0);
}

{ // buffer, under limit
const r = new Readable();

r.push(hello);
r.push(world);
r.push(null);

await assert.doesNotReject(r.collect(50));
}

{ // buffer, over limit
const r = new Readable();
r.push(hello);
r.push(world);
r.push(null);

await assert.rejects(r.collect(6), {
code: 'ERR_STREAM_COLLECT_LIMIT_EXCEEDED',
});
}

{ // buffer, setEncoding, no limit
const r = new Readable();

r.setEncoding('utf8');

r.push(hello);
r.push(world);
r.push(null);

const result = await r.collect();

assert.strictEqual(result, 'hello, world');
}

{ // buffer, setEncoding, under limit
const r = new Readable();

r.setEncoding('utf8');

r.push(hello);
r.push(world);
r.push(null);

await assert.doesNotReject(r.collect(50));
}

{ // buffer, setEncoding, over limit
const r = new Readable();

r.setEncoding('utf8');

r.push(hello);
r.push(world);
r.push(null);

await assert.rejects(r.collect(6), {
code: 'ERR_STREAM_COLLECT_LIMIT_EXCEEDED',
});
}

{ // object, no limit
const r = new Readable({ objectMode: true });

const objs = [{ n: 0 }, { n: 1 }];

r.push(objs[0]);
r.push(objs[1]);
r.push(null);

const result = await r.collect();

assert.deepStrictEqual(result, objs);
}

{ // object, under limit
const r = new Readable({ objectMode: true });

const objs = [{ n: 0 }, { n: 1 }];

r.push(objs[0]);
r.push(objs[1]);
r.push(null);

assert.doesNotReject(r.collect(5));
}

{ // object, over limit
const r = new Readable({ objectMode: true });

const objs = [{ n: 0 }, { n: 1 }, { n: 2 }];

r.push(objs[0]);
r.push(objs[1]);
r.push(objs[2]);
r.push(null);

await assert.rejects(r.collect(2), {
code: 'ERR_STREAM_COLLECT_LIMIT_EXCEEDED',
});
}
}

tests().then(common.mustCall());

0 comments on commit 8dc2246

Please sign in to comment.