From 578d12fa10502cfe2c2fe370c5f0666ad35037b3 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 7 Dec 2019 14:17:04 +0100 Subject: [PATCH] fs: synchronize close with other I/O for streams Part of the flakiness in the parallel/test-readline-async-iterators-destroy test comes from fs streams starting `_read()` and `_destroy()` without waiting for the other to finish, which can lead to the `fs.read()` call resulting in `EBADF` if timing is bad. Fix this by synchronizing write and read operations with `close()`. Refs: https://github.com/nodejs/node/issues/30660 PR-URL: https://github.com/nodejs/node/pull/30837 Reviewed-By: Luigi Pinca Reviewed-By: Ben Noordhuis Reviewed-By: Matteo Collina Reviewed-By: Rich Trott --- lib/internal/fs/streams.js | 52 +++++++++++++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index ada5dc829859c3..b7189573416899 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -7,10 +7,12 @@ const { NumberIsSafeInteger, ObjectDefineProperty, ObjectSetPrototypeOf, + Symbol, } = primordials; const { - ERR_OUT_OF_RANGE + ERR_OUT_OF_RANGE, + ERR_STREAM_DESTROYED } = require('internal/errors').codes; const { validateNumber } = require('internal/validators'); const fs = require('fs'); @@ -21,6 +23,8 @@ const { } = require('internal/fs/utils'); const { Readable, Writable } = require('stream'); const { toPathIfFileURL } = require('internal/url'); +const kIoDone = Symbol('kIoDone'); +const kIsPerformingIO = Symbol('kIsPerformingIO'); const kMinPoolSpace = 128; @@ -85,6 +89,7 @@ function ReadStream(path, options) { this.pos = undefined; this.bytesRead = 0; this.closed = false; + this[kIsPerformingIO] = false; if (this.start !== undefined) { checkPosition(this.start, 'start'); @@ -143,6 +148,8 @@ ReadStream.prototype._read = function(n) { }); } + if (this.destroyed) return; + if (!pool || pool.length - pool.used < kMinPoolSpace) { // Discard the old pool. allocNewPool(this.readableHighWaterMark); @@ -166,7 +173,12 @@ ReadStream.prototype._read = function(n) { return this.push(null); // the actual read. + this[kIsPerformingIO] = true; fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { + this[kIsPerformingIO] = false; + // Tell ._destroy() that it's safe to close the fd now. + if (this.destroyed) return this.emit(kIoDone, er); + if (er) { if (this.autoClose) { this.destroy(); @@ -212,8 +224,12 @@ ReadStream.prototype._destroy = function(err, cb) { return; } + if (this[kIsPerformingIO]) { + this.once(kIoDone, (er) => closeFsStream(this, cb, err || er)); + return; + } + closeFsStream(this, cb, err); - this.fd = null; }; function closeFsStream(stream, cb, err) { @@ -224,6 +240,8 @@ function closeFsStream(stream, cb, err) { if (!er) stream.emit('close'); }); + + stream.fd = null; } ReadStream.prototype.close = function(cb) { @@ -262,6 +280,7 @@ function WriteStream(path, options) { this.pos = undefined; this.bytesWritten = 0; this.closed = false; + this[kIsPerformingIO] = false; if (this.start !== undefined) { checkPosition(this.start, 'start'); @@ -316,7 +335,17 @@ WriteStream.prototype._write = function(data, encoding, cb) { }); } + if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); + + this[kIsPerformingIO] = true; fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { + this[kIsPerformingIO] = false; + // Tell ._destroy() that it's safe to close the fd now. + if (this.destroyed) { + cb(er); + return this.emit(kIoDone, er); + } + if (er) { if (this.autoClose) { this.destroy(); @@ -339,7 +368,8 @@ WriteStream.prototype._writev = function(data, cb) { }); } - const self = this; + if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); + const len = data.length; const chunks = new Array(len); let size = 0; @@ -351,12 +381,22 @@ WriteStream.prototype._writev = function(data, cb) { size += chunk.length; } - fs.writev(this.fd, chunks, this.pos, function(er, bytes) { + this[kIsPerformingIO] = true; + fs.writev(this.fd, chunks, this.pos, (er, bytes) => { + this[kIsPerformingIO] = false; + // Tell ._destroy() that it's safe to close the fd now. + if (this.destroyed) { + cb(er); + return this.emit(kIoDone, er); + } + if (er) { - self.destroy(); + if (this.autoClose) { + this.destroy(); + } return cb(er); } - self.bytesWritten += bytes; + this.bytesWritten += bytes; cb(); });