Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readableStream.tee() doesn't work when using process.nextTick(...) #39758

Closed
szmarczak opened this issue Aug 13, 2021 · 12 comments
Closed

readableStream.tee() doesn't work when using process.nextTick(...) #39758

szmarczak opened this issue Aug 13, 2021 · 12 comments
Labels
confirmed-bug Issues with confirmed bugs. web streams

Comments

@szmarczak
Copy link
Member

szmarczak commented Aug 13, 2021

Version

v16.6.2

Platform

Linux solus 5.13.8-191.current #1 SMP PREEMPT Fri Aug 6 11:29:58 UTC 2021 x86_64 GNU/Linux

Subsystem

stream/web

What steps will reproduce the bug?

import {ReadableStream} from 'stream/web';

let controller;
let start = c => (controller = c);
let pull = () => {
  if (pull.called) return;
  pull.called = true;

  process.nextTick(() => {
   controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114]))

    process.nextTick(() => {
     controller.close()
    });
  });
};

const [a, b] = new ReadableStream({start, pull}).tee();

for (const stream of [a, b]) {
  const chunks = [];

  for await (const chunk of stream) {
    chunks.push(chunk);
  }

  console.log('received:', Buffer.concat(chunks).toString());
}

How often does it reproduce? Is there a required condition?

Always.

What is the expected behavior?

received: foobar
received: foobar

What do you see instead?

received: foobar
received:

Additional information

Deno for reference: https://github.com/denoland/deno/blob/a0285e2eb88f6254f6494b0ecd1878db3a3b2a58/ext/web/06_streams.js#L1465-L1543

/cc @ronag @mcollina @jasnell

@ronag
Copy link
Member

ronag commented Aug 13, 2021

@jasnell @nodejs/streams

@szmarczak
Copy link
Member Author

szmarczak commented Aug 15, 2021

If to replace process.nextTick with queueMicrotask in the example then it works. Maybe a bug (race condition) in the spec?

Edit:

The spec uses queueMicrotask but process.nextTick results in race condition, so we need to controller.close() when all other web streams work is done - Promise.resolve() should be sufficient enough in the example.

@ronag
Copy link
Member

ronag commented Aug 18, 2021

@jasnell

@jasnell
Copy link
Member

jasnell commented Aug 18, 2021

Yeah, I've got this queued up to look at this week. Just haven't yet had the opportunity.

@Mesteery Mesteery added confirmed-bug Issues with confirmed bugs. web streams labels Sep 25, 2021
@ronag
Copy link
Member

ronag commented Sep 26, 2021

@jasnell

@ronag
Copy link
Member

ronag commented Oct 28, 2021

@jasnell Sorry to bother. I think this is a rather bad bug. Any chance you will have time to look at it?

@ofirbarak
Copy link
Contributor

Hi, I'll work on it

@jasnell
Copy link
Member

jasnell commented Oct 29, 2021

Sorry I've been buried and haven't had the opportunity to dig in. It's not a bug in the spec, keep in mind that nextTick is a Node specific concept.

@ronag
Copy link
Member

ronag commented Oct 29, 2021

@szmarczak We had this issue on undici and there we don't really use nextTick in this context? i.e. we always wrap controller.close in a microtask to ensure this doesn't happen (https://github.com/nodejs/undici/blob/main/lib/fetch/util.js#L153-L155). Is it just related to nextTick?

@szmarczak
Copy link
Member Author

Well Node.js streams use nextTick so that's where the incompatibility comes from. I'm thinking of the finished function exactly.

@RafaelGSS
Copy link
Member

RafaelGSS commented Nov 21, 2021

I've created a PR to address it. However, just to bring more context to the described issue:

import {ReadableStream} from 'stream/web';

let controller;
let start = c => (controller = c);
let pull = () => {
  if (pull.called) return;
  pull.called = true;

  process.nextTick(() => {
   controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114]))

    process.nextTick(() => {
     controller.close()
    });
  });
};

const [a, b] = new ReadableStream({start, pull}).tee();

for (const stream of [a, b]) {
  const chunks = [];

  for await (const chunk of stream) {
    chunks.push(chunk);
  }

  console.log('received:', Buffer.concat(chunks).toString());
}

As mentioned by @szmarczak it returns:

received: foobar
received:

On ES Modules, however, when you change it to commonjs it works as expected:

- import {ReadableStream} from 'stream/web';
+ const {ReadableStream} = require('stream/web');
received: foobar
received: foobar

The #40901 aims to solve it

@MoonBall
Copy link
Member

MoonBall commented Jan 4, 2022

This issue had been solved. Can we close it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
confirmed-bug Issues with confirmed bugs. web streams
Projects
None yet
Development

No branches or pull requests

7 participants