Skip to content

Commit af29120

Browse files
CGQAQmarco-ippolito
authored andcommittedJun 17, 2024
stream: use ByteLengthQueuingStrategy when not in objectMode
Fixes: #46347 PR-URL: #48847 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent be309bd commit af29120

File tree

2 files changed

+64
-5
lines changed

2 files changed

+64
-5
lines changed
 

‎lib/internal/webstreams/adapters.js

+2-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const {
2525

2626
const {
2727
CountQueuingStrategy,
28+
ByteLengthQueuingStrategy,
2829
} = require('internal/webstreams/queuingstrategies');
2930

3031
const {
@@ -417,11 +418,7 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
417418
return new CountQueuingStrategy({ highWaterMark });
418419
}
419420

420-
// When not running in objectMode explicitly, we just fall
421-
// back to a minimal strategy that just specifies the highWaterMark
422-
// and no size algorithm. Using a ByteLengthQueuingStrategy here
423-
// is unnecessary.
424-
return { highWaterMark };
421+
return new ByteLengthQueuingStrategy({ highWaterMark });
425422
};
426423

427424
const strategy = evaluateStrategyOrFallback(options?.strategy);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (!common.hasCrypto) { common.skip('missing crypto'); }
4+
5+
const { Readable } = require('stream');
6+
const process = require('process');
7+
const { randomBytes } = require('crypto');
8+
const assert = require('assert');
9+
10+
// Based on: https://github.com/nodejs/node/issues/46347#issuecomment-1413886707
11+
// edit: make it cross-platform as /dev/urandom is not available on Windows
12+
{
13+
let currentMemoryUsage = process.memoryUsage().arrayBuffers;
14+
15+
// We initialize a stream, but not start consuming it
16+
const randomNodeStream = new Readable({
17+
read(size) {
18+
randomBytes(size, (err, buffer) => {
19+
if (err) {
20+
// If an error occurs, emit an 'error' event
21+
this.emit('error', err);
22+
return;
23+
}
24+
25+
// Push the random bytes to the stream
26+
this.push(buffer);
27+
});
28+
}
29+
});
30+
// after 2 seconds, it'll get converted to web stream
31+
let randomWebStream;
32+
33+
// We check memory usage every second
34+
// since it's a stream, it shouldn't be higher than the chunk size
35+
const reportMemoryUsage = () => {
36+
const { arrayBuffers } = process.memoryUsage();
37+
currentMemoryUsage = arrayBuffers;
38+
39+
assert(currentMemoryUsage <= 256 * 1024 * 1024);
40+
};
41+
setInterval(reportMemoryUsage, 1000);
42+
43+
// after 1 second we use Readable.toWeb
44+
// memory usage should stay pretty much the same since it's still a stream
45+
setTimeout(() => {
46+
randomWebStream = Readable.toWeb(randomNodeStream);
47+
}, 1000);
48+
49+
// after 2 seconds we start consuming the stream
50+
// memory usage will grow, but the old chunks should be garbage-collected pretty quickly
51+
setTimeout(async () => {
52+
// eslint-disable-next-line no-unused-vars
53+
for await (const _ of randomWebStream) {
54+
// Do nothing, just let the stream flow
55+
}
56+
}, 2000);
57+
58+
setTimeout(() => {
59+
// Test considered passed if we don't crash
60+
process.exit(0);
61+
}, 5000);
62+
}

0 commit comments

Comments
 (0)
Please sign in to comment.