Skip to content

Commit

Permalink
stream: implement streams to webstreams adapters
Browse files Browse the repository at this point in the history
Experimental adapters for the webstreams API

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: nodejs#39134
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
jasnell authored and Mesteery committed Oct 9, 2021
1 parent 03c2432 commit 155b677
Show file tree
Hide file tree
Showing 13 changed files with 2,344 additions and 1 deletion.
68 changes: 67 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2080,7 +2080,7 @@ for performance reasons.

### `stream.Readable.fromWeb(readableStream[, options])`
<!-- YAML
added: v16.11.0
added: REPLACEME
-->

> Stability: 1 - Experimental
Expand All @@ -2093,6 +2093,41 @@ added: v16.11.0
* `signal` {AbortSignal}
* Returns: {stream.Readable}

### `stream.Readable.toWeb(streamReadable)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `streamReadable` {stream.Readable}
* Returns: {ReadableStream}

### `stream.Writable.fromWeb(writableStream[, options])`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `writableStream` {WritableStream}
* `options` {Object}
* `decodeStrings` {boolean}
* `highWaterMark` {number}
* `objectMode` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Writable}

### `stream.Writable.toWeb(streamWritable)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `streamWritable` {stream.Writable}
* Returns: {WritableStream}

### `stream.Readable.isDisturbed(stream)`
<!-- YAML
added: v16.8.0
Expand Down Expand Up @@ -2133,6 +2168,37 @@ A utility method for creating duplex streams.
* `Promise` converts into readable `Duplex`. Value `null` is ignored.
* Returns: {stream.Duplex}

### `stream.Duplex.fromWeb(pair[, options])`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `pair` {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}
* `options` {Object}
* `allowHalfOpen` {boolean}
* `decodeStrings` {boolean}
* `encoding` {string}
* `highWaterMark` {number}
* `objectMode` {boolean}
* `signal` {AbortSignal}
* Returns: {stream.Duplex}

### `stream.Duplex.toWeb(streamDuplex)`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `streamDuplex` {stream.Duplex}
* Returns: {Object}
* `readable` {ReadableStream}
* `writable` {WritableStream}

### `stream.addAbortSignal(signal, stream)`
<!-- YAML
added: v15.4.0
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ module.exports = {
appendFile,
readFile,
watch,

kHandle,
},

FileHandle,
Expand Down
19 changes: 19 additions & 0 deletions lib/internal/streams/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,22 @@ Duplex.from = function(body) {
}
return duplexify(body, 'body');
};

let webStreamsAdapters;

// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}

Duplex.fromWeb = function(pair, options) {
return lazyWebStreams().newStreamDuplexFromReadableWritablePair(
pair,
options);
};

Duplex.toWeb = function(duplex) {
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
};
19 changes: 19 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1384,3 +1384,22 @@ Readable.wrap = function(src, options) {
}
}).wrap(src);
};

let webStreamsAdapters;

// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}

Readable.fromWeb = function(readableStream, options) {
return lazyWebStreams().newStreamReadableFromReadableStream(
readableStream,
options);
};

Readable.toWeb = function(streamReadable) {
return lazyWebStreams().newStreamReadableFromReadableStream(streamReadable);
};
19 changes: 19 additions & 0 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -871,3 +871,22 @@ Writable.prototype._destroy = function(err, cb) {
Writable.prototype[EE.captureRejectionSymbol] = function(err) {
this.destroy(err);
};

let webStreamsAdapters;

// Lazy to avoid circular references
function lazyWebStreams() {
if (webStreamsAdapters === undefined)
webStreamsAdapters = require('internal/webstreams/adapters');
return webStreamsAdapters;
}

Writable.fromWeb = function(writableStream, options) {
return lazyWebStreams().newStreamWritableFromWritableStream(
writableStream,
options);
};

Writable.toWeb = function(streamWritable) {
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
};

0 comments on commit 155b677

Please sign in to comment.