diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index decf5327c984ab..35d93f7d61cbca 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -172,6 +172,10 @@ ReadStream.prototype._read = function(n) { } this.emit('error', er); } else if (bytesRead > 0) { + if (this.pos !== undefined) { + this.pos += bytesRead; + } + this.bytesRead += bytesRead; if (bytesRead !== buf.length) { @@ -188,10 +192,6 @@ ReadStream.prototype._read = function(n) { this.push(null); } }); - - if (this.pos !== undefined) { - this.pos += n; - } }; ReadStream.prototype._destroy = function(err, cb) { diff --git a/test/parallel/test-fs-read-stream-pos.js b/test/parallel/test-fs-read-stream-pos.js new file mode 100644 index 00000000000000..c9470cb23ddeb6 --- /dev/null +++ b/test/parallel/test-fs-read-stream-pos.js @@ -0,0 +1,69 @@ +'use strict'; + +// Refs: https://github.com/nodejs/node/issues/33940 + +const common = require('../common'); +const tmpdir = require('../common/tmpdir'); +const fs = require('fs'); +const assert = require('assert'); +const path = require('path'); + +tmpdir.refresh(); + +const file = path.join(tmpdir.path, '/read_stream_pos_test.txt'); + +fs.writeFileSync(file, ''); + +let counter = 0; + +setInterval(() => { + counter = counter + 1; + const line = `hello at ${counter}\n`; + fs.writeFileSync(file, line, { flag: 'a' }); +}, 1); + +const hwm = 10; +let bufs = []; +let isLow = false; +let cur = 0; +let stream; + +setInterval(() => { + if (stream) return; + + stream = fs.createReadStream(file, { + highWaterMark: hwm, + start: cur + }); + stream.on('data', common.mustCallAtLeast((chunk) => { + cur += chunk.length; + bufs.push(chunk); + if (isLow) { + const brokenLines = Buffer.concat(bufs).toString() + .split('\n') + .filter((line) => { + const s = 'hello at'.slice(0, line.length); + if (line && !line.startsWith(s)) { + return true; + } + return false; + }); + assert.strictEqual(brokenLines.length, 0); + process.exit(); + return; + } + if (chunk.length !== hwm) { + isLow = true; + } + })); + stream.on('end', () => { + stream = null; + isLow = false; + bufs = []; + }); +}, 10); + +// Time longer than 90 seconds to exit safely +setTimeout(() => { + process.exit(); +}, 90000);