Skip to content

Commit

Permalink
fs: synchronize close with other I/O for streams
Browse files Browse the repository at this point in the history
Part of the flakiness in the
parallel/test-readline-async-iterators-destroy test comes from
fs streams starting `_read()` and `_destroy()` without waiting
for the other to finish, which can lead to the `fs.read()` call
resulting in `EBADF` if timing is bad.

Fix this by synchronizing write and read operations with `close()`.

Refs: #30660
  • Loading branch information
addaleax committed Dec 9, 2019
1 parent d7b8ae7 commit b1f7bf0
Showing 1 changed file with 45 additions and 6 deletions.
51 changes: 45 additions & 6 deletions lib/internal/fs/streams.js
Expand Up @@ -10,7 +10,8 @@ const {
} = primordials;

const {
ERR_OUT_OF_RANGE
ERR_OUT_OF_RANGE,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;
const internalUtil = require('internal/util');
const { validateNumber } = require('internal/validators');
Expand All @@ -22,6 +23,8 @@ const {
} = require('internal/fs/utils');
const { Readable, Writable } = require('stream');
const { toPathIfFileURL } = require('internal/url');
const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');

const kMinPoolSpace = 128;

Expand Down Expand Up @@ -86,6 +89,7 @@ function ReadStream(path, options) {
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
this[kIsPerformingIO] = false;

if (this.start !== undefined) {
checkPosition(this.start, 'start');
Expand Down Expand Up @@ -155,6 +159,8 @@ ReadStream.prototype._read = function(n) {
});
}

if (this.destroyed) return;

if (!pool || pool.length - pool.used < kMinPoolSpace) {
// Discard the old pool.
allocNewPool(this.readableHighWaterMark);
Expand All @@ -178,7 +184,12 @@ ReadStream.prototype._read = function(n) {
return this.push(null);

// the actual read.
this[kIsPerformingIO] = true;
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);

if (er) {
if (this.autoClose) {
this.destroy();
Expand Down Expand Up @@ -224,8 +235,12 @@ ReadStream.prototype._destroy = function(err, cb) {
return;
}

if (this[kIsPerformingIO]) {
this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
return;
}

closeFsStream(this, cb, err);
this.fd = null;
};

function closeFsStream(stream, cb, err) {
Expand All @@ -236,6 +251,8 @@ function closeFsStream(stream, cb, err) {
if (!er)
stream.emit('close');
});

stream.fd = null;
}

ReadStream.prototype.close = function(cb) {
Expand Down Expand Up @@ -274,6 +291,7 @@ function WriteStream(path, options) {
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
this[kIsPerformingIO] = false;

if (this.start !== undefined) {
checkPosition(this.start, 'start');
Expand Down Expand Up @@ -339,7 +357,17 @@ WriteStream.prototype._write = function(data, encoding, cb) {
});
}

if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));

this[kIsPerformingIO] = true;
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
if (this.autoClose) {
this.destroy();
Expand All @@ -362,7 +390,8 @@ WriteStream.prototype._writev = function(data, cb) {
});
}

const self = this;
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));

const len = data.length;
const chunks = new Array(len);
let size = 0;
Expand All @@ -374,12 +403,22 @@ WriteStream.prototype._writev = function(data, cb) {
size += chunk.length;
}

fs.writev(this.fd, chunks, this.pos, function(er, bytes) {
this[kIsPerformingIO] = true;
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
self.destroy();
if (this.autoClose) {
this.destroy();
}
return cb(er);
}
self.bytesWritten += bytes;
this.bytesWritten += bytes;
cb();
});

Expand Down

0 comments on commit b1f7bf0

Please sign in to comment.