Skip to content

Commit

Permalink
stream: add readableEnded
Browse files Browse the repository at this point in the history
Adds a readableEnded property and improved finished compat with possible
stream-like objects.

PR-URL: nodejs#28814
Refs: nodejs#28813
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
ronag authored and targos committed Aug 20, 2019
1 parent f42eb01 commit a9f8b62
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 2 deletions.
9 changes: 9 additions & 0 deletions doc/api/stream.md
Expand Up @@ -1126,6 +1126,15 @@ added: v12.7.0
Getter for the property `encoding` of a given `Readable` stream. The `encoding`
property can be set using the [`readable.setEncoding()`][] method.

##### readable.readableEnded
<!-- YAML
added: REPLACEME
-->

* {boolean}

Becomes `true` when [`'end'`][] event is emitted.

##### readable.readableHighWaterMark
<!-- YAML
added: v9.3.0
Expand Down
10 changes: 10 additions & 0 deletions lib/_stream_readable.js
Expand Up @@ -196,6 +196,16 @@ Object.defineProperty(Readable.prototype, 'destroyed', {
}
});

Object.defineProperty(Readable.prototype, 'readableEnded', {
// Making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get() {
return this._readableState ? this._readableState.endEmitted : false;
}
});

Readable.prototype.destroy = destroyImpl.destroy;
Readable.prototype._undestroy = destroyImpl.undestroy;
Readable.prototype._destroy = function(err, cb) {
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/async_iterator.js
Expand Up @@ -132,7 +132,7 @@ const createReadableStreamAsyncIterator = (stream) => {
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: {
value: stream._readableState.endEmitted,
value: stream.readableEnded || stream._readableState.endEmitted,
writable: true
},
// The function passed to new Promise is cached so we avoid allocating a new
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/streams/end-of-stream.js
Expand Up @@ -42,7 +42,8 @@ function eos(stream, opts, callback) {
if (!readable) callback.call(stream);
};

var readableEnded = stream._readableState && stream._readableState.endEmitted;
var readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
const onend = () => {
readable = false;
readableEnded = true;
Expand Down
9 changes: 9 additions & 0 deletions test/parallel/test-stream-finished.js
Expand Up @@ -3,6 +3,7 @@
const common = require('../common');
const { Writable, Readable, Transform, finished } = require('stream');
const assert = require('assert');
const EE = require('events');
const fs = require('fs');
const { promisify } = require('util');

Expand Down Expand Up @@ -175,3 +176,11 @@ const { promisify } = require('util');
rs.push(null);
rs.resume();
}

{
const streamLike = new EE();
streamLike.readableEnded = true;
streamLike.readable = true;
finished(streamLike, common.mustCall);
streamLike.emit('close');
}
33 changes: 33 additions & 0 deletions test/parallel/test-stream-readable-ended.js
@@ -0,0 +1,33 @@
'use strict';

const common = require('../common');
const { Readable } = require('stream');
const assert = require('assert');

// basic
{
// Find it on Readable.prototype
assert(Readable.prototype.hasOwnProperty('readableEnded'));
}

// event
{
const readable = new Readable();

readable._read = () => {
// The state ended should start in false.
assert.strictEqual(readable.readableEnded, false);
readable.push('asd');
assert.strictEqual(readable.readableEnded, false);
readable.push(null);
assert.strictEqual(readable.readableEnded, false);
};

readable.on('end', common.mustCall(() => {
assert.strictEqual(readable.readableEnded, true);
}));

readable.on('data', common.mustCall(() => {
assert.strictEqual(readable.readableEnded, false);
}));
}

0 comments on commit a9f8b62

Please sign in to comment.