Skip to content

Commit a0b9853

Browse files
debadree25MoLow
authored andcommittedJul 6, 2023
fs: implement byob mode for readableWebStream()
Fixes: #45853 PR-URL: #46933 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 0a65c7c commit a0b9853

File tree

3 files changed

+152
-10
lines changed

3 files changed

+152
-10
lines changed
 

‎doc/api/fs.md

+9-1
Original file line numberDiff line numberDiff line change
@@ -444,14 +444,22 @@ Reads data from the file and stores that in the given buffer.
444444
If the file is not modified concurrently, the end-of-file is reached when the
445445
number of bytes read is zero.
446446
447-
#### `filehandle.readableWebStream()`
447+
#### `filehandle.readableWebStream(options)`
448448
449449
<!-- YAML
450450
added: v17.0.0
451+
changes:
452+
- version: REPLACEME
453+
pr-url: https://github.com/nodejs/node/pull/46933
454+
description: Added option to create a 'bytes' stream.
451455
-->
452456
453457
> Stability: 1 - Experimental
454458
459+
* `options` {Object}
460+
* `type` {string|undefined} Whether to open a normal or a `'bytes'` stream.
461+
**Default:** `undefined`
462+
455463
* Returns: {ReadableStream}
456464
457465
Returns a `ReadableStream` that may be used to read the files data.

‎lib/internal/fs/promises.js

+59-9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const {
1414
SafePromisePrototypeFinally,
1515
Symbol,
1616
Uint8Array,
17+
FunctionPrototypeBind,
1718
} = primordials;
1819

1920
const { fs: constants } = internalBinding('constants');
@@ -252,7 +253,7 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
252253
* } ReadableStream
253254
* @returns {ReadableStream}
254255
*/
255-
readableWebStream() {
256+
readableWebStream(options = kEmptyObject) {
256257
if (this[kFd] === -1)
257258
throw new ERR_INVALID_STATE('The FileHandle is closed');
258259
if (this[kClosePromise])
@@ -261,15 +262,64 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
261262
throw new ERR_INVALID_STATE('The FileHandle is locked');
262263
this[kLocked] = true;
263264

264-
const readable = newReadableStreamFromStreamBase(
265-
this[kHandle],
266-
undefined,
267-
{ ondone: () => this[kUnref]() });
265+
if (options.type !== undefined) {
266+
validateString(options.type, 'options.type');
267+
}
268268

269-
this[kRef]();
270-
this.once('close', () => {
271-
readableStreamCancel(readable);
272-
});
269+
let readable;
270+
271+
if (options.type !== 'bytes') {
272+
const {
273+
newReadableStreamFromStreamBase,
274+
} = require('internal/webstreams/adapters');
275+
readable = newReadableStreamFromStreamBase(
276+
this[kHandle],
277+
undefined,
278+
{ ondone: () => this[kUnref]() });
279+
280+
const {
281+
readableStreamCancel,
282+
} = require('internal/webstreams/readablestream');
283+
this[kRef]();
284+
this.once('close', () => {
285+
readableStreamCancel(readable);
286+
});
287+
} else {
288+
const {
289+
readableStreamCancel,
290+
ReadableStream,
291+
} = require('internal/webstreams/readablestream');
292+
293+
const readFn = FunctionPrototypeBind(this.read, this);
294+
const ondone = FunctionPrototypeBind(this[kUnref], this);
295+
296+
readable = new ReadableStream({
297+
type: 'bytes',
298+
autoAllocateChunkSize: 16384,
299+
300+
async pull(controller) {
301+
const view = controller.byobRequest.view;
302+
const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength);
303+
304+
if (bytesRead === 0) {
305+
ondone();
306+
controller.close();
307+
}
308+
309+
controller.byobRequest.respond(bytesRead);
310+
},
311+
312+
cancel() {
313+
ondone();
314+
},
315+
});
316+
317+
this[kRef]();
318+
319+
this.once('close', () => {
320+
readableStreamCancel(readable);
321+
});
322+
}
273323

274324
return readable;
275325
}

‎test/parallel/test-filehandle-readablestream.js

+84
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,87 @@ const check = readFileSync(__filename, { encoding: 'utf8' });
8686
mc.port1.close();
8787
await file.close();
8888
})().then(common.mustCall());
89+
90+
// Make sure 'bytes' stream works
91+
(async () => {
92+
const file = await open(__filename);
93+
const dec = new TextDecoder();
94+
const readable = file.readableWebStream({ type: 'bytes' });
95+
const reader = readable.getReader({ mode: 'byob' });
96+
97+
let data = '';
98+
let result;
99+
do {
100+
const buff = new ArrayBuffer(100);
101+
result = await reader.read(new DataView(buff));
102+
if (result.value !== undefined) {
103+
data += dec.decode(result.value);
104+
assert.ok(result.value.byteLength <= 100);
105+
}
106+
} while (!result.done);
107+
108+
assert.strictEqual(check, data);
109+
110+
assert.throws(() => file.readableWebStream(), {
111+
code: 'ERR_INVALID_STATE',
112+
});
113+
114+
await file.close();
115+
})().then(common.mustCall());
116+
117+
// Make sure that acquiring a ReadableStream 'bytes' stream
118+
// fails if the FileHandle is already closed.
119+
(async () => {
120+
const file = await open(__filename);
121+
await file.close();
122+
123+
assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
124+
code: 'ERR_INVALID_STATE',
125+
});
126+
})().then(common.mustCall());
127+
128+
// Make sure that acquiring a ReadableStream 'bytes' stream
129+
// fails if the FileHandle is already closing.
130+
(async () => {
131+
const file = await open(__filename);
132+
file.close();
133+
134+
assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
135+
code: 'ERR_INVALID_STATE',
136+
});
137+
})().then(common.mustCall());
138+
139+
// Make sure the 'bytes' ReadableStream is closed when the underlying
140+
// FileHandle is closed.
141+
(async () => {
142+
const file = await open(__filename);
143+
const readable = file.readableWebStream({ type: 'bytes' });
144+
const reader = readable.getReader({ mode: 'byob' });
145+
file.close();
146+
await reader.closed;
147+
})().then(common.mustCall());
148+
149+
// Make sure the 'bytes' ReadableStream is closed when the underlying
150+
// FileHandle is closed.
151+
(async () => {
152+
const file = await open(__filename);
153+
const readable = file.readableWebStream({ type: 'bytes' });
154+
file.close();
155+
const reader = readable.getReader({ mode: 'byob' });
156+
await reader.closed;
157+
})().then(common.mustCall());
158+
159+
// Make sure that the FileHandle is properly marked "in use"
160+
// when a 'bytes' ReadableStream has been acquired for it.
161+
(async () => {
162+
const file = await open(__filename);
163+
file.readableWebStream({ type: 'bytes' });
164+
const mc = new MessageChannel();
165+
mc.port1.onmessage = common.mustNotCall();
166+
assert.throws(() => mc.port2.postMessage(file, [file]), {
167+
code: 25,
168+
name: 'DataCloneError',
169+
});
170+
mc.port1.close();
171+
await file.close();
172+
})().then(common.mustCall());

0 commit comments

Comments
 (0)
Please sign in to comment.