Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fs: allow overriding fs for streams
Allow overriding open, write, and close when using createReadStream()
and createWriteStream().

PR-URL: #29083
Refs: #29050
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
ronag authored and targos committed Apr 28, 2020
1 parent 709d3e5 commit a15e712
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 41 deletions.
26 changes: 24 additions & 2 deletions doc/api/fs.md
Expand Up @@ -1674,6 +1674,10 @@ changes:
- version: v2.3.0
pr-url: https://github.com/nodejs/node/pull/1845
description: The passed `options` object can be a string now.
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/29083
description: The `fs` options allow overriding the used `fs`
implementation.
-->

* `path` {string|Buffer|URL}
Expand All @@ -1688,7 +1692,8 @@ changes:
* `start` {integer}
* `end` {integer} **Default:** `Infinity`
* `highWaterMark` {integer} **Default:** `64 * 1024`
* Returns: {fs.ReadStream}
* `fs` {Object|null} **Default:** `null`
* Returns: {fs.ReadStream} See [Readable Stream][].

Unlike the 16 kb default `highWaterMark` for a readable stream, the stream
returned by this method has a default `highWaterMark` of 64 kb.
Expand All @@ -1715,6 +1720,10 @@ By default, the stream will not emit a `'close'` event after it has been
destroyed. This is the opposite of the default for other `Readable` streams.
Set the `emitClose` option to `true` to change this behavior.

By providing the `fs` option it is possible to override the corresponding `fs`
implementations for `open`, `read` and `close`. When providing the `fs` option,
you must override `open`, `close` and `read`.

```js
const fs = require('fs');
// Create a stream from some character device.
Expand Down Expand Up @@ -1768,6 +1777,10 @@ changes:
- version: v2.3.0
pr-url: https://github.com/nodejs/node/pull/1845
description: The passed `options` object can be a string now.
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/REPLACEME
description: The `fs` options allow overriding the used `fs`
implementation.
-->

* `path` {string|Buffer|URL}
Expand All @@ -1780,7 +1793,8 @@ changes:
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `false`
* `start` {integer}
* Returns: {fs.WriteStream}
* `fs` {Object|null} **Default:** `null`
* Returns: {fs.WriteStream} See [Writable Stream][].

`options` may also include a `start` option to allow writing data at
some position past the beginning of the file, allowed values are in the
Expand All @@ -1799,6 +1813,12 @@ By default, the stream will not emit a `'close'` event after it has been
destroyed. This is the opposite of the default for other `Writable` streams.
Set the `emitClose` option to `true` to change this behavior.

By providing the `fs` option it is possible to override the corresponding `fs`
implementations for `open`, `write`, `writev` and `close`. Overriding `write()`
without `writev()` can reduce performance as some optimizations (`_writev()`)
will be disabled. When providing the `fs` option, you must override `open`,
`close` and at least one of `write` and `writev`.

Like [`ReadStream`][], if `fd` is specified, [`WriteStream`][] will ignore the
`path` argument and will use the specified file descriptor. This means that no
`'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s
Expand Down Expand Up @@ -5528,6 +5548,7 @@ the file contents.
[`Number.MAX_SAFE_INTEGER`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
[`ReadDirectoryChangesW`]: https://docs.microsoft.com/en-us/windows/desktop/api/winbase/nf-winbase-readdirectorychangesw
[`ReadStream`]: #fs_class_fs_readstream
[Readable Stream]: #stream_class_stream_readable
[`URL`]: url.html#url_the_whatwg_url_api
[`UV_THREADPOOL_SIZE`]: cli.html#cli_uv_threadpool_size_size
[`WriteStream`]: #fs_class_fs_writestream
Expand Down Expand Up @@ -5587,3 +5608,4 @@ the file contents.
[chcp]: https://ss64.com/nt/chcp.html
[inode]: https://en.wikipedia.org/wiki/Inode
[support of file system `flags`]: #fs_file_system_flags
[Writable Stream]: #stream_class_stream_writable
128 changes: 91 additions & 37 deletions lib/internal/fs/streams.js
Expand Up @@ -11,6 +11,7 @@ const {
} = primordials;

const {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;
Expand All @@ -27,6 +28,7 @@ const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');

const kMinPoolSpace = 128;
const kFs = Symbol('kFs');

let pool;
// It can happen that we expect to read a large chunk of data, and reserve
Expand Down Expand Up @@ -75,6 +77,23 @@ function ReadStream(path, options) {
options.emitClose = false;
}

this[kFs] = options.fs || fs;

if (typeof this[kFs].open !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
this[kFs].open);
}

if (typeof this[kFs].read !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function',
this[kFs].read);
}

if (typeof this[kFs].close !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
this[kFs].close);
}

Readable.call(this, options);

// Path will be ignored when fd is specified, so it can be falsy
Expand Down Expand Up @@ -124,7 +143,7 @@ ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);

ReadStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (er, fd) => {
this[kFs].open(this.path, this.flags, this.mode, (er, fd) => {
if (er) {
if (this.autoClose) {
this.destroy();
Expand Down Expand Up @@ -174,42 +193,43 @@ ReadStream.prototype._read = function(n) {

// the actual read.
this[kIsPerformingIO] = true;
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);

if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
} else {
let b = null;
// Now that we know how much data we have actually read, re-wind the
// 'used' field if we can, and otherwise allow the remainder of our
// reservation to be used as a new pool later.
if (start + toRead === thisPool.used && thisPool === pool) {
const newUsed = thisPool.used + bytesRead - toRead;
thisPool.used = roundUpToMultipleOf8(newUsed);
this[kFs].read(
this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);

if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
} else {
// Round down to the next lowest multiple of 8 to ensure the new pool
// fragment start and end positions are aligned to an 8 byte boundary.
const alignedEnd = (start + toRead) & ~7;
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
if (alignedEnd - alignedStart >= kMinPoolSpace) {
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
let b = null;
// Now that we know how much data we have actually read, re-wind the
// 'used' field if we can, and otherwise allow the remainder of our
// reservation to be used as a new pool later.
if (start + toRead === thisPool.used && thisPool === pool) {
const newUsed = thisPool.used + bytesRead - toRead;
thisPool.used = roundUpToMultipleOf8(newUsed);
} else {
// Round down to the next lowest multiple of 8 to ensure the new pool
// fragment start and end positions are aligned to an 8 byte boundary.
const alignedEnd = (start + toRead) & ~7;
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
if (alignedEnd - alignedStart >= kMinPoolSpace) {
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
}
}
}

if (bytesRead > 0) {
this.bytesRead += bytesRead;
b = thisPool.slice(start, start + bytesRead);
}
if (bytesRead > 0) {
this.bytesRead += bytesRead;
b = thisPool.slice(start, start + bytesRead);
}

this.push(b);
}
});
this.push(b);
}
});

// Move the pool positions, and internal position for reading.
if (this.pos !== undefined)
Expand All @@ -233,7 +253,7 @@ ReadStream.prototype._destroy = function(err, cb) {
};

function closeFsStream(stream, cb, err) {
fs.close(stream.fd, (er) => {
stream[kFs].close(stream.fd, (er) => {
er = er || err;
cb(er);
stream.closed = true;
Expand Down Expand Up @@ -268,6 +288,40 @@ function WriteStream(path, options) {
options.emitClose = false;
}

this[kFs] = options.fs || fs;
if (typeof this[kFs].open !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
this[kFs].open);
}

if (!this[kFs].write && !this[kFs].writev) {
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
this[kFs].write);
}

if (this[kFs].write && typeof this[kFs].write !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
this[kFs].write);
}

if (this[kFs].writev && typeof this[kFs].writev !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function',
this[kFs].writev);
}

if (typeof this[kFs].close !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
this[kFs].close);
}

// It's enough to override either, in which case only one will be used.
if (!this[kFs].write) {
this._write = null;
}
if (!this[kFs].writev) {
this._writev = null;
}

Writable.call(this, options);

// Path will be ignored when fd is specified, so it can be falsy
Expand Down Expand Up @@ -313,7 +367,7 @@ WriteStream.prototype._final = function(callback) {
};

WriteStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, (er, fd) => {
this[kFs].open(this.path, this.flags, this.mode, (er, fd) => {
if (er) {
if (this.autoClose) {
this.destroy();
Expand All @@ -339,7 +393,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));

this[kIsPerformingIO] = true;
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
Expand Down Expand Up @@ -383,7 +437,7 @@ WriteStream.prototype._writev = function(data, cb) {
}

this[kIsPerformingIO] = true;
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
Expand Down
13 changes: 11 additions & 2 deletions test/parallel/test-fs-read-stream.js
Expand Up @@ -31,11 +31,11 @@ const fixtures = require('../common/fixtures');
const fn = fixtures.path('elipses.txt');
const rangeFile = fixtures.path('x.txt');

{
function test1(options) {
let paused = false;
let bytesRead = 0;

const file = fs.createReadStream(fn);
const file = fs.createReadStream(fn, options);
const fileSize = fs.statSync(fn).size;

assert.strictEqual(file.bytesRead, 0);
Expand Down Expand Up @@ -88,6 +88,15 @@ const rangeFile = fixtures.path('x.txt');
});
}

test1({});
test1({
fs: {
open: common.mustCall(fs.open),
read: common.mustCallAtLeast(fs.read, 1),
close: common.mustCall(fs.close),
}
});

{
const file = fs.createReadStream(fn, { encoding: 'utf8' });
file.length = 0;
Expand Down
38 changes: 38 additions & 0 deletions test/parallel/test-fs-write-stream-fs.js
@@ -0,0 +1,38 @@
'use strict';
const common = require('../common');
const path = require('path');
const fs = require('fs');

const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

{
const file = path.join(tmpdir.path, 'write-end-test0.txt');
const stream = fs.createWriteStream(file, {
fs: {
open: common.mustCall(fs.open),
write: common.mustCallAtLeast(fs.write, 1),
close: common.mustCall(fs.close),
}
});
stream.end('asd');
stream.on('close', common.mustCall());
}


{
const file = path.join(tmpdir.path, 'write-end-test1.txt');
const stream = fs.createWriteStream(file, {
fs: {
open: common.mustCall(fs.open),
write: fs.write,
writev: common.mustCallAtLeast(fs.writev, 1),
close: common.mustCall(fs.close),
}
});
stream.write('asd');
stream.write('asd');
stream.write('asd');
stream.end();
stream.on('close', common.mustCall());
}

0 comments on commit a15e712

Please sign in to comment.