Skip to content

Commit

Permalink
fs: remove custom Buffer pool for streams
Browse files Browse the repository at this point in the history
The performance benefit of using a custom pool are negligable.
Furthermore, it causes problems with Workers and transferrable.
Rather than further adding complexity for compat with Workers,
just remove the pooling logic.

Refs: #33880 (comment)
Fixes: #31733

PR-URL: #33981
Backport-PR-URL: #38397
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Ben Noordhuis <[email protected]>
  • Loading branch information
ronag authored and targos committed Apr 26, 2021
1 parent f62b138 commit 443cace
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 74 deletions.
102 changes: 31 additions & 71 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,8 @@ const { toPathIfFileURL } = require('internal/url');
const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');

const kMinPoolSpace = 128;
const kFs = Symbol('kFs');

let pool;
// It can happen that we expect to read a large chunk of data, and reserve
// a large chunk of the pool accordingly, but the read() call only filled
// a portion of it. If a concurrently executing read() then uses the same pool,
// the "reserved" portion cannot be used, so we allow it to be re-used as a
// new pool later.
const poolFragments = [];

function allocNewPool(poolSize) {
if (poolFragments.length > 0)
pool = poolFragments.pop();
else
pool = Buffer.allocUnsafe(poolSize);
pool.used = 0;
}

function roundUpToMultipleOf8(n) {
return (n + 7) & ~7; // Align to 8 byte boundary.
}

function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
Expand Down Expand Up @@ -165,73 +144,54 @@ ReadStream.prototype._read = function(n) {

if (this.destroyed) return;

if (!pool || pool.length - pool.used < kMinPoolSpace) {
// Discard the old pool.
allocNewPool(this.readableHighWaterMark);
}
n = this.pos !== undefined ?
MathMin(this.end - this.pos + 1, n) :
MathMin(this.end - this.bytesRead + 1, n);

// Grab another reference to the pool in the case that while we're
// in the thread pool another read() finishes up the pool, and
// allocates a new one.
const thisPool = pool;
let toRead = MathMin(pool.length - pool.used, n);
const start = pool.used;

if (this.pos !== undefined)
toRead = MathMin(this.end - this.pos + 1, toRead);
else
toRead = MathMin(this.end - this.bytesRead + 1, toRead);
if (n <= 0) {
this.push(null);
return;
}

// Already read everything we were supposed to read!
// treat as EOF.
if (toRead <= 0)
return this.push(null);
const buf = Buffer.allocUnsafeSlow(n);

// the actual read.
this[kIsPerformingIO] = true;
this[kFs].read(
this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kFs]
.read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => {
this[kIsPerformingIO] = false;

// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);
if (this.destroyed) {
this.emit(kIoDone, er);
return;
}

if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
} else {
let b = null;
// Now that we know how much data we have actually read, re-wind the
// 'used' field if we can, and otherwise allow the remainder of our
// reservation to be used as a new pool later.
if (start + toRead === thisPool.used && thisPool === pool) {
const newUsed = thisPool.used + bytesRead - toRead;
thisPool.used = roundUpToMultipleOf8(newUsed);
} else {
// Round down to the next lowest multiple of 8 to ensure the new pool
// fragment start and end positions are aligned to an 8 byte boundary.
const alignedEnd = (start + toRead) & ~7;
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
if (alignedEnd - alignedStart >= kMinPoolSpace) {
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
}
}

if (bytesRead > 0) {
this.bytesRead += bytesRead;
b = thisPool.slice(start, start + bytesRead);
} else if (bytesRead > 0) {
this.bytesRead += bytesRead;

if (bytesRead !== buf.length) {
// Slow path. Shrink to fit.
// Copy instead of slice so that we don't retain
// large backing buffer for small reads.
const dst = Buffer.allocUnsafeSlow(bytesRead);
buf.copy(dst, 0, 0, bytesRead);
buf = dst;
}

this.push(b);
this.push(buf);
} else {
this.push(null);
}
});

// Move the pool positions, and internal position for reading.
if (this.pos !== undefined)
this.pos += toRead;

pool.used = roundUpToMultipleOf8(pool.used + toRead);
if (this.pos !== undefined) {
this.pos += n;
}
};

ReadStream.prototype._destroy = function(err, cb) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
/* eslint-disable node-core/crypto-check */
'use strict';
// Refs: https://github.com/nodejs/node/issues/31733
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

const assert = require('assert');
const crypto = require('crypto');
const fs = require('fs');
Expand Down Expand Up @@ -121,7 +123,6 @@ function test(config) {

tmpdir.refresh();

// OK
test({
cipher: 'aes-128-ccm',
aad: Buffer.alloc(1),
Expand All @@ -131,7 +132,6 @@ test({
plaintextLength: 32768,
});

// Fails the fstream test.
test({
cipher: 'aes-128-ccm',
aad: Buffer.alloc(1),
Expand Down

0 comments on commit 443cace

Please sign in to comment.