Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readable stream's awaitDrain increased incorrectly #27571

Closed
abbshr opened this issue May 5, 2019 · 1 comment
Closed

Readable stream's awaitDrain increased incorrectly #27571

abbshr opened this issue May 5, 2019 · 1 comment
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@abbshr
Copy link
Contributor

abbshr commented May 5, 2019

  • Version: (just test in) v10.x and above
  • Platform: (in fact whatever) Darwin hacker-machine 18.5.0 Darwin Kernel Version 18.5.0: Mon Mar 11 20:40:32 PDT 2019; root:xnu-4903.251.3~3/RELEASE_X86_64 x86_64
  • Subsystem: stream

I was working on a streaming protocol encoder/decoder, when I write test for it, I found something not right.

To make things clearly, I simplify the logic, these code can recurrent the problem:

const { PassThrough } = require("stream")

const encode = new PassThrough({ highWaterMark: 1 })
const decode = new PassThrough({ highWaterMark: 1 })

encode.pipe(decode)

function send(buf) {
  console.log("send", buf)
  encode.write(buf)
}

let i = 0
decode.on("data", d => {
  console.log("get data", d)

  if (++i === 2) {
    send(Buffer.from([0x3]))
    send(Buffer.from([0x4]))
  }
})

send(Buffer.from([0x1]))
send(Buffer.from([0x2]))

the output is:

send <Buffer 01>
get data <Buffer 01>
send <Buffer 02>
get data <Buffer 02>
send <Buffer 03>
send <Buffer 04>
get data <Buffer 03>

but what I expect is:

send <Buffer 01>
get data <Buffer 01>
send <Buffer 02>
get data <Buffer 02>
send <Buffer 03>
send <Buffer 04>
get data <Buffer 03>
get data <Buffer 04>

I guess it may be some problem related to eventloop tick. So I add process.nextTick:

decode.on("data", d => {
  console.log("get data", d)

  if (++i === 2) {
    process.nextTick(() => {
      send(Buffer.from([0x3]))
      send(Buffer.from([0x4]))
    })
  }
})

And this time it output what I expect.

Use NODE_DEBUG=stream to see what happened:

STREAM 28824: pipe count=1 opts=undefined
STREAM 28824: resume
STREAM 28824: resume
send <Buffer 01>
STREAM 28824: readableAddChunk <Buffer 01>
STREAM 28824: ondata
STREAM 28824: readableAddChunk <Buffer 01>
get data <Buffer 01>
STREAM 28824: dest.write false
STREAM 28824: false write response, pause 0
STREAM 28824: call pause flowing=true
STREAM 28824: pause
send <Buffer 02>
STREAM 28824: readableAddChunk <Buffer 02>
STREAM 28824: emitReadable false
STREAM 28824: resume false
STREAM 28824: read 0
STREAM 28824: need readable false
STREAM 28824: flow false
STREAM 28824: resume false
STREAM 28824: read 0
STREAM 28824: need readable true
STREAM 28824: length less than watermark true
STREAM 28824: do read
STREAM 28824: flow true
STREAM 28824: read undefined
STREAM 28824: need readable true
STREAM 28824: length less than watermark true
STREAM 28824: reading or ended false
STREAM 28824: pipeOnDrain 1
STREAM 28824: flow true
STREAM 28824: read undefined
STREAM 28824: need readable true
STREAM 28824: length less than watermark true
STREAM 28824: do read
STREAM 28824: ondata
STREAM 28824: readableAddChunk <Buffer 02>
get data <Buffer 02>
send <Buffer 03>
STREAM 28824: readableAddChunk <Buffer 03>
STREAM 28824: ondata
STREAM 28824: dest.write false
STREAM 28824: false write response, pause 0
STREAM 28824: call pause flowing=true
STREAM 28824: pause
send <Buffer 04>
STREAM 28824: readableAddChunk <Buffer 04>
STREAM 28824: emitReadable false
STREAM 28824: readableAddChunk <Buffer 03>
get data <Buffer 03>
STREAM 28824: dest.write false
STREAM 28824: false write response, pause 1
STREAM 28824: call pause flowing=false
STREAM 28824: emitReadable_ false 1 false
STREAM 28824: flow false
STREAM 28824: emitReadable_ false 1 false
STREAM 28824: flow false
STREAM 28824: maybeReadMore read 0
STREAM 28824: read 0
STREAM 28824: need readable true
STREAM 28824: length less than watermark true
STREAM 28824: do read
STREAM 28824: pipeOnDrain 2

we can notice that once the awaitDrain got greater than 1 on single pipe target, it's incorrect.

I then dig into the source code, and I know why.

In the first code block above, after first encoder.write() finished, encoder's readable side has been paused because the decoder's writable side write() return false. and next encoder.write() buffer the data in encoder's readable side and wait for a "drain" event in next tick from decoder's writable side.
When the first "drain" event emitted, encoder's readable side become flowing again, and "data" listener set by pipe() was called to dest.write().
From the output debug log and the source code, we can see the .write() call is synchronous, so before the write finished, in decoder's "data" listener, add additional 2 encoder.write() call, but at this time, encoder's writeable side's writing property has become false (because after data buffer in encoder's readable side, onwrite() was called to set writing to false), and so, the third encoder.write() write successfully to encoder's readable side, and
run into encoder's "data" event listener again, here because dest.write() return false, so it make the encoder's readable side paused, and set state.awaitDrain++, and when the third encoder.write() finished, return to the previous encoder's "data" listener, and it found dest.write() (which just finished) return false, so it increase the state.awaitDrain again! Now that we have a wrong counter. After that, we return to the last encoder.write(), and buffered in encoder's readable side again wait for decoder draining. next tick decoder drained, and encoder's state.awaitDrain--, but it was > 1 before, so it never could be 0, and the encoder's readable side was blocked forever.

The problem code is simplified as below:

src.on("data", (d) => {
  // in second `encoder.write()`
  const ret = dest.write(d)
       // inside the above dest.write() in third `encoder.write()`
       {
         const ret = dest.write(d)
         if (!ret) {state.awaitDrain++; state.pause()}
       }

  ...
  if (!ret) {state.awaitDrain++; state.pause()}
})

It seems the stream API document doesn't mention about this behavior.
There'r dozen of solution to fix this, But in fact, except puting .write() out of current tick, a better solution I thought is to keep the sycn behavior and prevent awaitDrain to increase by accident.

@lpinca lpinca added the stream Issues and PRs related to the stream subsystem. label May 5, 2019
@abbshr
Copy link
Contributor Author

abbshr commented Aug 27, 2019

fixed in #27572

@abbshr abbshr closed this as completed Aug 27, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants