Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs: remove custom Buffer pool for streams #33981

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
100 changes: 30 additions & 70 deletions lib/internal/fs/streams.js
Expand Up @@ -26,29 +26,8 @@ const { toPathIfFileURL } = require('internal/url');
const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');

const kMinPoolSpace = 128;
const kFs = Symbol('kFs');

let pool;
// It can happen that we expect to read a large chunk of data, and reserve
// a large chunk of the pool accordingly, but the read() call only filled
// a portion of it. If a concurrently executing read() then uses the same pool,
// the "reserved" portion cannot be used, so we allow it to be re-used as a
// new pool later.
const poolFragments = [];

function allocNewPool(poolSize) {
if (poolFragments.length > 0)
pool = poolFragments.pop();
else
pool = Buffer.allocUnsafe(poolSize);
pool.used = 0;
}

function roundUpToMultipleOf8(n) {
return (n + 7) & ~7; // Align to 8 byte boundary.
}

function _construct(callback) {
const stream = this;
if (typeof stream.fd === 'number') {
Expand Down Expand Up @@ -188,70 +167,51 @@ ReadStream.prototype.open = openReadFs;
ReadStream.prototype._construct = _construct;

ReadStream.prototype._read = function(n) {
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// Discard the old pool.
allocNewPool(this.readableHighWaterMark);
}
n = this.pos !== undefined ?
MathMin(this.end - this.pos + 1, n) :
MathMin(this.end - this.bytesRead + 1, n);

// Grab another reference to the pool in the case that while we're
// in the thread pool another read() finishes up the pool, and
// allocates a new one.
const thisPool = pool;
let toRead = MathMin(pool.length - pool.used, n);
const start = pool.used;

if (this.pos !== undefined)
toRead = MathMin(this.end - this.pos + 1, toRead);
else
toRead = MathMin(this.end - this.bytesRead + 1, toRead);
if (n <= 0) {
this.push(null);
return;
}

// Already read everything we were supposed to read!
// treat as EOF.
if (toRead <= 0)
return this.push(null);
const buf = Buffer.allocUnsafeSlow(n);

// the actual read.
this[kIsPerformingIO] = true;
this[kFs]
.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
.read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
this[kIsPerformingIO] = false;

// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);
if (this.destroyed) {
this.emit(kIoDone, er);
return;
}

if (er) {
errorOrDestroy(this, er);
} else {
let b = null;
// Now that we know how much data we have actually read, re-wind the
// 'used' field if we can, and otherwise allow the remainder of our
// reservation to be used as a new pool later.
if (start + toRead === thisPool.used && thisPool === pool) {
const newUsed = thisPool.used + bytesRead - toRead;
thisPool.used = roundUpToMultipleOf8(newUsed);
} else {
// Round down to the next lowest multiple of 8 to ensure the new pool
// fragment start and end positions are aligned to an 8 byte boundary.
const alignedEnd = (start + toRead) & ~7;
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
if (alignedEnd - alignedStart >= kMinPoolSpace) {
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
}
}

if (bytesRead > 0) {
this.bytesRead += bytesRead;
b = thisPool.slice(start, start + bytesRead);
} else if (bytesRead > 0) {
this.bytesRead += bytesRead;

if (bytesRead !== buf.length) {
// Slow path. Shrink to fit.
// Copy instead of slice so that we don't retain
// large backing buffer for small reads.
const dst = Buffer.allocUnsafeSlow(bytesRead);
buf.copy(dst, 0, 0, bytesRead);
buf = dst;
}

this.push(b);
this.push(buf);
} else {
this.push(null);
}
});

// Move the pool positions, and internal position for reading.
if (this.pos !== undefined)
this.pos += toRead;

pool.used = roundUpToMultipleOf8(pool.used + toRead);
if (this.pos !== undefined) {
this.pos += n;
}
};

ReadStream.prototype._destroy = function(err, cb) {
Expand Down
Expand Up @@ -2,6 +2,9 @@
'use strict';
// Refs: https://github.com/nodejs/node/issues/31733
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

const assert = require('assert');
const crypto = require('crypto');
const fs = require('fs');
Expand Down Expand Up @@ -121,7 +124,6 @@ function test(config) {

tmpdir.refresh();

// OK
test({
cipher: 'aes-128-ccm',
aad: Buffer.alloc(1),
Expand All @@ -131,7 +133,6 @@ test({
plaintextLength: 32768,
});

// Fails the fstream test.
test({
cipher: 'aes-128-ccm',
aad: Buffer.alloc(1),
Expand Down