Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: propagate errors from src streams in async iterator #30861

Closed

Conversation

marcosc90
Copy link
Contributor

This should fix errors not being propagated when using async iterators on a piped stream if an error occured in one of the sources in the pipe chain/pipeline, which makes it very difficult & weird to handle errors.

Now piped streams keep track of the sources, so only when Symbol.asyncIterator is called on the destination stream, an error handle can be attached to each source & trigger an error on the stream being iterated.

The 3 added tests fail on master.

Fixes: #28194

Checklist
  • make -j4 test (UNIX), or vcbuild test (Windows) passes
  • tests and/or benchmarks are included
  • commit message follows commit guidelines

@nodejs-github-bot nodejs-github-bot added the stream Issues and PRs related to the stream subsystem. label Dec 9, 2019
@Trott
Copy link
Member

Trott commented Dec 9, 2019

@nodejs/streams

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution. After some thoughts, I do not think we should have special behavior for async iterators when piping. The pipe() operator does not forward errors for pipes, and adding it to async iterators would expose some subtle bugs that are very hard to track.

My recommendation is to focus these efforts on pipeline which forward errors. We should add native support for async iterators there, like so:

const { pipeline } = require('stream')
const { createReadStream, createWriteStream } = require('fs')

pipeline(
  createReadStream('./big-file'),
  async function * transform(source) {
    for await (let chunk of source) {
      yield chunk.toString().toUpperCase()
    }
  },
  createWriteStream('./dest'),
  (err) => {
    if (err) console.log(err)
  }
)

Or something even better to read.

@marcosc90
Copy link
Contributor Author

marcosc90 commented Dec 9, 2019

The pipe() operator does not forward errors for pipes, and adding it to async iterators would expose some subtle bugs that are very hard to track.

I understand your point, but the async iterator API is already bugged, and this change only affect async iterator API, except the pushing to pipeSources which should not cause any bugs, since it's not being used in any part other than async iterators.

I doubt anyone can agree that the current implementation of async iterators is not bugged or not weird. The following code is not what anyone would expect when working with promises/async iterators, specially because unless you propagate the errors yourself, like in the following snippet, print function will never settle if an error is thrown in any place other than the last piped stream. And allowing to have a promise that never settles is not something that should belong in core.

const fs = require('fs');
const request = require('request');
const { PassThrough } = require('stream')

async function print() {

    const read = fs.createReadStream('/tmp/some.json');
    const req = request('http://example.com');
    const req2 = request('http://example.com/two');
    const req3 = request('http://example.com/three');

    const stream = new PassThrough();
    
    read.on('error', (err) => stream.emit('error', err));
    req.on('error', (err) => stream.emit('error', err));
    req2.on('error', (err) => stream.emit('error', err));
    req3.on('error', (err) => stream.emit('error', err));

    const iterator = read
        .pipe(req)
        .pipe(req2)
        .pipe(req3)
        .pipe(stream)

    for await (const k of iterator) {
        console.log(k);
    }
}

vs

async function print() {
    
    const iterator = fs.createReadStream('/tmp/some.json')
        .pipe(request('http://example.com'))
        .pipe(request('http://example.com/two'))
        .pipe(request('http://example.com/three'))
        .pipe(new PassThrough())

    for await (const k of iterator) {
        console.log(k);
    }
}

Promise support is an important part of the latest Node releases, and I think they should be supported correctly.

My recommendation is to focus these efforts on pipeline which forward errors. We should add native support for async iterators there, like so:

This PR also fixes the same issue for pipeline which does not work either with async iterators. I agree that we should also add support for your example.

@ronag
Copy link
Member

ronag commented Dec 9, 2019

I doubt anyone can agree that the current implementation of async iterators is not bugged

I think it's consistent... pipe does not forward errors... so why would async iterators? Whether that's the best way or not is a discussion I think is a few years too late.

Maybe we can do something for this use case through pipeline, e.g.

for (const k of pipeline(
  fs.createReadStream('/tmp/some.json'),
  request('http://example.com'),
  request('http://example.com/two'),
  request('http://example.com/three')
)) {
  console.log(k);
}

i.e. pipeline could return a async generator, or something.

@mcollina
Copy link
Member

mcollina commented Dec 9, 2019

I understand your point, but the async iterator API is already bugged, and this change only affect async iterator API, except the pushing to pipeSources which should not cause any bugs, since it's not being used in any part other than async iterators.

Can you please expand on why you states that the async iterator API is bugged? It merely reflects how streams operate. Streams have suffered from this problem for a long time, however we cannot change them due to backward compatibility.


The following work as expected right now:

async function print() {
    
    const iterator = pipeline(
        fs.createReadStream('/tmp/some.json'),
       request('http://example.com'),
       request('http://example.com/two'),
       request('http://example.com/three'),
       new PassThrough(), () = {})

    for await (const k of iterator) {
        console.log(k);
    }
}

If that does not work, it would be good to open an issue with a way to reproduce and/or a fix for that that does not involve changing pipe().

Moreover, I know we can improve on this API and I would love to see further developments to simplify things.

@ronag
Copy link
Member

ronag commented Dec 9, 2019

The following work as expected right now:

Oh, I missed that! Would be nice if the callback was optional.

@mcollina
Copy link
Member

mcollina commented Dec 9, 2019

@ronag See the discussion in #21054. I'd be happy to something that removed that callback and yet it avoided uncaught exceptions. I agree that In the case of async iteration that callback is really annoying.

@marcosc90
Copy link
Contributor Author

marcosc90 commented Dec 9, 2019

The following work as expected right now:

async function print() {
    
    const iterator = pipeline(
        fs.createReadStream('/tmp/some.json'),
       request('http://example.com'),
       request('http://example.com/two'),
       request('http://example.com/three'),
       new PassThrough(), () = {})

    for await (const k of iterator) {
        console.log(k);
    }
}

The following does not work as one would expect, if any of those streams fail, print will resolve, without iterating. If something fails, when working with promises, it should reject imo.

Can you please expand on why you states that the async iterator API is bugged? It merely reflects how streams operate. Streams have suffered from this problem for a long time, however we cannot change them due to backward compatibility.

In my opinion the promise API should trigger errors correctly, so the behaviour is not obvious. In any case, if backward compatibility is the issue, we can add an option to .pipe to facilitate this to the user.

async function print() {
    
    const iterator = fs.createReadStream('/tmp/some.json')
        .pipe(request('http://example.com'))
        .pipe(new PassThrough(), { wrapErrors: true }) // propagateErrors, errors or whatever name

    for await (const k of iterator) {
        console.log(k);
    }
}

And instead of this being a "fix", it'll be a new feature, what do you think?

@ronag
Copy link
Member

ronag commented Dec 9, 2019

The following does not work as one would expect, if any of those streams fail, print will resolve, without iterating. If something fails, when working with promises, it should reject imo.

I agree with this.

The following work as expected right now:

Actually it doesn't work like I expected. It just returns the last stream.

@ronag
Copy link
Member

ronag commented Dec 9, 2019

I don't think this PR is the way to go. I think we have a bug somewhere else.

See this failing test:

async function test () {
  const iterator = pipeline(
      new Readable({
        read() {
          // pipeline should propagate this to the next stream through it's
          // destroy(err) cleanup.
          this.destroy(new Error('err'));
        }
      }),
      new PassThrough(),
    () => {}
  )

  for await (const k of iterator) {
    console.log(k);
  }
}
test().catch(common.mustCall());

I think this should work as it is right now.

@ronag
Copy link
Member

ronag commented Dec 9, 2019

@marcosc90: A potential workaround:

function pipeline2(...streams) {
  const pt = new PassThrough();
  for (const stream of streams) {
    stream.on('error', err => {
      pt.destroy(err);
    });
  }
  pipeline(...streams, pt, () => {});
  return pt;
}
async function print() {  
    const iterator = pipeline2(
        fs.createReadStream('/tmp/some.json'),
       request('http://example.com'),
       request('http://example.com/two'),
       request('http://example.com/three'),
       new PassThrough())

    for await (const k of iterator) {
        console.log(k);
    }
}

@marcosc90
Copy link
Contributor Author

marcosc90 commented Dec 9, 2019

I don't think this PR is the way to go. I think we have a bug somewhere else.

See this failing test:

async function test () {
  const iterator = pipeline(
    [
      new Readable({
        read() {
          // pipeline should propagate this to the next stream through it's
          // destroy(err) cleanup.
          this.destroy(new Error('err'));
        }
      }),
      new PassThrough()
    ],
    () => {}
  )

  for await (const k of iterator) {
    console.log(k);
  }
}
test().catch(common.mustCall());

I think this should work as it is right now.

My PR fixes this, .catch is called in my branch.

@mcollina
Copy link
Member

mcollina commented Dec 9, 2019

Here is a fix to the pipeline issue: #30869.

@marcosc90
Copy link
Contributor Author

Here is a fix to the pipeline issue: #30869.

Thanks, this solves half the issue 🎉

I'd like to continue the discussion regarding adding an optional argument, or another alternative so a chain pipe works well with promises.

mcollina added a commit to mcollina/node that referenced this pull request Dec 12, 2019
This changes makes all stream in a pipeline emit 'error' in
case of an abnormal termination of the pipeline. If the last stream
is currently being async iterated, this change will make the iteration
reject accordingly.

See: nodejs#30861
Fixes: nodejs#28194
mcollina added a commit that referenced this pull request Dec 14, 2019
This changes makes all stream in a pipeline emit 'error' in
case of an abnormal termination of the pipeline. If the last stream
is currently being async iterated, this change will make the iteration
reject accordingly.

See: #30861
Fixes: #28194

PR-URL: #30869
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
MylesBorins pushed a commit that referenced this pull request Dec 17, 2019
This changes makes all stream in a pipeline emit 'error' in
case of an abnormal termination of the pipeline. If the last stream
is currently being async iterated, this change will make the iteration
reject accordingly.

See: #30861
Fixes: #28194

PR-URL: #30869
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
@BridgeAR
Copy link
Member

@mcollina @ronag @nodejs/streams please take another look at #30861 (comment)

@ronag
Copy link
Member

ronag commented Dec 25, 2019

I'd like to continue the discussion regarding adding an optional argument, or another alternative so a chain pipe works well with promises.

I'm not following here? What's wrong with pipeline?

@marcosc90
Copy link
Contributor Author

I think both APIs should play well with Async iterators. Since I don't see any support for this, I'll close it, and use pipeline with a mandatory callback as a work around.

@marcosc90 marcosc90 closed this Dec 26, 2019
targos pushed a commit to targos/node that referenced this pull request Apr 25, 2020
This changes makes all stream in a pipeline emit 'error' in
case of an abnormal termination of the pipeline. If the last stream
is currently being async iterated, this change will make the iteration
reject accordingly.

See: nodejs#30861
Fixes: nodejs#28194

PR-URL: nodejs#30869
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
codebytere pushed a commit that referenced this pull request Jun 6, 2020
This changes makes all stream in a pipeline emit 'error' in
case of an abnormal termination of the pipeline. If the last stream
is currently being async iterated, this change will make the iteration
reject accordingly.

See: #30861
Fixes: #28194

PR-URL: #30869
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

stream: Readable iterator unhandled error when piping
6 participants