Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: pipeline should use req.abort() to destroy response #31054

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 3 additions & 12 deletions lib/internal/streams/pipeline.js
Expand Up @@ -17,7 +17,7 @@ const {
} = require('internal/errors').codes;

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
return stream && stream.setHeader && typeof stream.abort === 'function';
}

function destroyer(stream, reading, writing, callback) {
Expand All @@ -43,22 +43,13 @@ function destroyer(stream, reading, writing, callback) {

// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (typeof stream.destroy === 'function') {
if (stream.req && stream._writableState === undefined) {
// This is a ClientRequest
// TODO(mcollina): backward compatible fix to avoid crashing.
// Possibly remove in a later semver-major change.
stream.req.on('error', noop);
}
return stream.destroy(err);
}
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);

callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}

function noop() {}

function pipe(from, to) {
return from.pipe(to);
}
Expand Down
35 changes: 34 additions & 1 deletion test/parallel/test-stream-pipeline.js
@@ -1,7 +1,14 @@
'use strict';

const common = require('../common');
const { Stream, Writable, Readable, Transform, pipeline } = require('stream');
const {
Stream,
Writable,
Readable,
Transform,
pipeline,
PassThrough
} = require('stream');
const assert = require('assert');
const http = require('http');
const { promisify } = require('util');
Expand Down Expand Up @@ -483,3 +490,29 @@ const { promisify } = require('util');
{ code: 'ERR_INVALID_CALLBACK' }
);
}

{
const server = http.Server(function(req, res) {
res.write('asd');
});
server.listen(0, function() {
http.get({ port: this.address().port }, (res) => {
const stream = new PassThrough();

stream.on('error', common.mustCall());

pipeline(
res,
stream,
common.mustCall((err) => {
assert.ok(err);
// TODO
ronag marked this conversation as resolved.
Show resolved Hide resolved
// assert.strictEqual(err.message, 'oh no');
server.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to also validate the error here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. At least that err is truthy.

Copy link
Member Author

@ronag ronag Dec 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Unfortunately we get an unexpected/different error (ERR_STREAM_DESTROYED) due to the behavior discussed in #31060. So I made it just check for truthyness.

Copy link
Member

@lpinca lpinca Dec 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't understand how this is related to #31060. AFAIK none of the _destroy() implementations is async.

It seems a regression on master to me

const stream = require('stream');

const data = Buffer.alloc(1024);

{
  const readable = new stream.Readable({
    read() {
      this.push(data);
    }
  });

  const writable = new stream.Writable({
    write(chunk, encoding, callback) {
      callback();
    }
  });

  writable.on('error', console.error);

  readable.pipe(writable);
  writable.destroy(new Error('Oops'));
}

{
  const readable = new stream.Readable({
    read() {
      this.push(data);
    }
  });

  const writable = new stream.Writable({
    write(chunk, encoding, callback) {
      callback();
    }
  });

  stream.pipeline(readable, writable, console.error);

  writable.destroy(new Error('Oops'));
}

This prints

Error: Oops
    at Object.<anonymous> (/Users/luigi/Desktop/pipe.js:23:20)
    at Module._compile (internal/modules/cjs/loader.js:1139:30)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:1159:10)
    at Module.load (internal/modules/cjs/loader.js:988:32)
    at Function.Module._load (internal/modules/cjs/loader.js:896:14)
    at Function.executeUserEntryPoint [as runMain] (internal/modules/run_main.js:71:12)
    at internal/main/run_main_module.js:17:47
Error: Oops
    at Object.<anonymous> (/Users/luigi/Desktop/pipe.js:41:20)
    at Module._compile (internal/modules/cjs/loader.js:1139:30)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:1159:10)
    at Module.load (internal/modules/cjs/loader.js:988:32)
    at Function.Module._load (internal/modules/cjs/loader.js:896:14)
    at Function.executeUserEntryPoint [as runMain] (internal/modules/run_main.js:71:12)
    at internal/main/run_main_module.js:17:47

on Node.js 13.5.0 and

Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed
    at Writable.write (_stream_writable.js:321:17)
    at Readable.ondata (_stream_readable.js:779:22)
    at Readable.emit (events.js:320:20)
    at Readable.read (_stream_readable.js:579:10)
    at flow (_stream_readable.js:1052:34)
    at resume_ (_stream_readable.js:1033:3)
    at processTicksAndRejections (internal/process/task_queues.js:84:21) {
  code: 'ERR_STREAM_DESTROYED'
}
Error [ERR_STREAM_DESTROYED]: Cannot call write after a stream was destroyed
    at Writable.write (_stream_writable.js:321:17)
    at Readable.ondata (_stream_readable.js:779:22)
    at Readable.emit (events.js:320:20)
    at Readable.read (_stream_readable.js:579:10)
    at flow (_stream_readable.js:1052:34)
    at resume_ (_stream_readable.js:1033:3)
    at processTicksAndRejections (internal/process/task_queues.js:84:21) {
  code: 'ERR_STREAM_DESTROYED'
}

on Node.js master

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bisecting points to 67ed526.

Copy link
Member

@lpinca lpinca Dec 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that _destroy() completed with an error yes, totally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I’ll add it to the proposals list.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this thread be resolved? We need a fix with the bug this PR aims to solve for node v13, or better revert the few commits that caused the problem in the first place.

Copy link
Member Author

@ronag ronag Dec 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this specific thread needs to be urgently resolved (or at the least it's a different issue). This PR in its current form resolves the original issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this discussion should not block the PR. The issue discussed here is caused by a semver-major change that will not be included in v13.x

})
);

stream.destroy(new Error('oh no'));
}).on('error', common.mustNotCall());
});
}