Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "stream: make finished call the callback if the stream is closed" #29717

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions lib/internal/streams/async_iterator.js
Expand Up @@ -112,6 +112,16 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
return() {
return new Promise((resolve, reject) => {
const stream = this[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
Expand Down
64 changes: 23 additions & 41 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -13,18 +13,6 @@ function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function isReadable(stream) {
return typeof stream.readable === 'boolean' ||
typeof stream.readableEnded === 'boolean' ||
!!stream._readableState;
}

function isWritable(stream) {
return typeof stream.writable === 'boolean' ||
typeof stream.writableEnded === 'boolean' ||
!!stream._writableState;
}

function eos(stream, opts, callback) {
if (arguments.length === 2) {
callback = opts;
Expand All @@ -40,49 +28,43 @@ function eos(stream, opts, callback) {

callback = once(callback);

const onerror = (err) => {
callback.call(stream, err);
};

let writableFinished = stream.writableFinished ||
(stream._writableState && stream._writableState.finished);
let readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);

if (writableFinished || readableEnded || stream.destroyed ||
stream.aborted) {
if (opts.error !== false) stream.on('error', onerror);
// A destroy(err) call emits error in nextTick.
process.nextTick(callback.bind(stream));
return () => {
stream.removeListener('error', onerror);
};
}

const readable = opts.readable ||
(opts.readable !== false && isReadable(stream));
const writable = opts.writable ||
(opts.writable !== false && isWritable(stream));
let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

var writableEnded = stream._writableState && stream._writableState.finished;
const onfinish = () => {
writableFinished = true;
if (!readable || readableEnded) callback.call(stream);
writable = false;
writableEnded = true;
if (!readable) callback.call(stream);
};

var readableEnded = stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
const onend = () => {
readable = false;
readableEnded = true;
if (!writable || writableFinished) callback.call(stream);
if (!writable) callback.call(stream);
};

const onerror = (err) => {
callback.call(stream, err);
};

const onclose = () => {
let err;
if (readable && !readableEnded) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
} else if (writable && !writableFinished) {
callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
if (!stream._readableState || !stream._readableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
if (writable && !writableEnded) {
if (!stream._writableState || !stream._writableState.ended)
err = new ERR_STREAM_PREMATURE_CLOSE();
return callback.call(stream, err);
}
};

Expand Down
106 changes: 0 additions & 106 deletions test/parallel/test-http-client-finished.js
Expand Up @@ -25,109 +25,3 @@ const { finished } = require('stream');
.end();
}));
}

{
// Test abort before finished.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}, common.mustNotCall());
req.abort();
finished(req, common.mustCall(() => {
server.close();
}));
}));
}

{
// Test abort after request.

const server = http.createServer(function(req, res) {
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).end();
finished(req, (err) => {
common.expectsError({
type: Error,
code: 'ERR_STREAM_PREMATURE_CLOSE'
})(err);
finished(req, common.mustCall(() => {
server.close();
}));
});
req.abort();
}));
}

{
// Test abort before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
const req = http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
req.abort();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test destroy before end.

const server = http.createServer(function(req, res) {
res.write('test');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
res.destroy();
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}

{
// Test finish after end.

const server = http.createServer(function(req, res) {
res.end('asd');
});

server.listen(0, common.mustCall(function() {
http.request({
port: this.address().port
}).on('response', common.mustCall((res) => {
// TODO(ronag): Bug? Won't emit 'close' unless read.
res.on('data', () => {});
finished(res, common.mustCall(() => {
finished(res, common.mustCall(() => {
server.close();
}));
}));
})).end();
}));
}