Skip to content

Commit

Permalink
Revert "stream: make finished call the callback if the stream is closed"
Browse files Browse the repository at this point in the history
This reverts commit b03845b.
  • Loading branch information
mcollina committed Oct 9, 2019
1 parent 7fc6286 commit fe76c85
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 330 deletions.
10 changes: 10 additions & 0 deletions lib/internal/streams/async_iterator.js
Expand Up @@ -112,6 +112,16 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
return() {
return new Promise((resolve, reject) => {
const stream = this[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
Expand Down
58 changes: 19 additions & 39 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -13,18 +13,6 @@ function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function isReadable(stream) {
return typeof stream.readable === 'boolean' ||
typeof stream.readableEnded === 'boolean' ||
!!stream._readableState;
}

function isWritable(stream) {
return typeof stream.writable === 'boolean' ||
typeof stream.writableEnded === 'boolean' ||
!!stream._writableState;
}

function eos(stream, opts, callback) {
if (arguments.length === 2) {
callback = opts;
Expand All @@ -40,51 +28,43 @@ function eos(stream, opts, callback) {

callback = once(callback);

const onerror = (err) => {
callback.call(stream, err);
};

let writableFinished = stream.writableFinished ||
(stream._writableState && stream._writableState.finished);
let readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);

if (writableFinished || readableEnded || stream.destroyed ||
stream.aborted) {
if (opts.error !== false) stream.on('error', onerror);
// A destroy(err) call emits error in nextTick.
process.nextTick(callback.bind(stream));
return () => {
stream.removeListener('error', onerror);
};
}

let readable = opts.readable ||
(opts.readable !== false && isReadable(stream));
let writable = opts.writable ||
(opts.writable !== false && isWritable(stream));
let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

var writableEnded = stream._writableState && stream._writableState.finished;
const onfinish = () => {
writable = false;
writableFinished = true;
writableEnded = true;
if (!readable) callback.call(stream);
};

var readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
const onend = () => {
readable = false;
readableEnded = true;
if (!writable) callback.call(stream);
};

const onerror = (err) => {
callback.call(stream, err);
};

const onclose = () => {
let err;
if (readable && !readableEnded) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
} else if (writable && !writableFinished) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
if (!stream._readableState || !stream._readableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
if (writable && !writableEnded) {
if (!stream._writableState || !stream._writableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
};

Expand Down
106 changes: 0 additions & 106 deletions test/parallel/test-http-client-finished.js
Expand Up @@ -25,109 +25,3 @@ const { finished } = require('stream');
.end();
}));
}

{
// Test abort before finished.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}, common.mustNotCall());
req.abort();
finished(req, common.mustCall(() => {
server.close();
}));
}));
}

{
// Test abort after request.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).end();
finished(req, (err) => {
common.expectsError({
type: Error,
code: 'ERR_STREAM_PREMATURE_CLOSE'
})(err);
finished(req, common.mustCall(() => {
server.close();
}));
});
req.abort();
}));
}

{
// Test abort before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
req.abort();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test destroy before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
res.destroy();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test finish after end.

const server = http.createServer(function(req, res) {
res.end('asd');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

0 comments on commit fe76c85

Please sign in to comment.