Skip to content

Commit

Permalink
fixup! streams: implement streams to webstreams adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Jun 25, 2021
1 parent 2ba3e7c commit f359cb3
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 12 deletions.
44 changes: 32 additions & 12 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
PromiseAll,
PromisePrototypeThen,
PromisePrototypeFinally,
PromiseResolve,
Uint8Array,
} = primordials;

Expand All @@ -23,10 +24,6 @@ const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const {
setPromiseHandled,
} = require('internal/webstreams/util');

const {
Writable,
Readable,
Expand Down Expand Up @@ -82,16 +79,14 @@ function newWritableStreamFromStreamWritable(streamWritable) {

let controller;
let backpressurePromise;
let stream;
let closed = false;
let closed;
let errored = false;

function onClose() {
closed = true;
streamWritable.off('close', onClose);
streamWritable.off('drain', onDrain);
streamWritable.off('error', onError);
setPromiseHandled(stream.close());
streamWritable.off('finish', onFinish);
}

function onDrain() {
Expand All @@ -103,14 +98,36 @@ function newWritableStreamFromStreamWritable(streamWritable) {
errored = true;
if (backpressurePromise !== undefined)
backpressurePromise.reject(error);
// If closed is not undefined, the error is happening
// after the WritableStream close has already started.
// We need to reject it here.
if (closed !== undefined) {
closed.reject(error);
closed = undefined;
}
controller.error(error);
}

function onFinish() {
// If the finish event happens and closed is defined, that's
// expected and good. If the finish event happens and closed
// is not set, the the stream.Writable closed unexpectedly
// and we need to let the WritableStream know via the controller.
if (closed !== undefined) {
closed.resolve();
closed = undefined;
} else {
controller.error(
new ERR_INVALID_STATE('The stream.Writable closed unexpectedly'));
}
}

streamWritable.once('close', onClose);
streamWritable.once('error', onError);
streamWritable.once('finish', onFinish);
streamWritable.on('drain', onDrain);

stream = new WritableStream({
return new WritableStream({
start(c) { controller = c; },

async write(chunk) {
Expand All @@ -129,12 +146,15 @@ function newWritableStreamFromStreamWritable(streamWritable) {
},

close() {
if (!streamWritable.writableEnded && !closed)
if (closed === undefined && !streamWritable.writableEnded) {
closed = createDeferredPromise();
streamWritable.end();
return closed.promise;
}

return PromiseResolve();
},
}, strategy);

return stream;
}

function newReadableStreamFromStreamReadable(streamReadable) {
Expand Down
17 changes: 17 additions & 0 deletions lib/stream/web.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ const {
CountQueuingStrategy,
} = require('internal/webstreams/queuingstrategies');

const {
newWritableStreamFromStreamWritable,
newReadableStreamFromStreamReadable,
newStreamWritableFromWritableStream,
newStreamReadableFromReadableStream,
newReadableWritablePairFromDuplex,
newDuplexFromReadableWritablePair,
} = require('internal/webstreams/adapters');

module.exports = {
ReadableStream,
ReadableStreamDefaultReader,
Expand All @@ -45,4 +54,12 @@ module.exports = {
WritableStreamDefaultController,
ByteLengthQueuingStrategy,
CountQueuingStrategy,

// Non-standard Node.js specific Adapter APIs
newWritableStreamFromStreamWritable,
newReadableStreamFromStreamReadable,
newStreamWritableFromWritableStream,
newStreamReadableFromReadableStream,
newReadableWritablePairFromDuplex,
newDuplexFromReadableWritablePair,
};
129 changes: 129 additions & 0 deletions test/parallel/test-whatwg-webstreams-adapters-to-writablestream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Flags: --no-warnings

'use strict';

const common = require('../common');

const assert = require('assert');

const {
newWritableStreamFromStreamWritable,
} = require('stream/web');

const {
Writable,
} = require('stream');

class TestWritable extends Writable {
constructor(asyncWrite = false) {
super();
this.chunks = [];
this.asyncWrite = asyncWrite;
}

_write(chunk, encoding, callback) {
this.chunks.push({ chunk, encoding });
if (this.asyncWrite) {
setImmediate(() => callback());
return;
}
callback();
}
}

{
// Closing the WritableStream normally closes the stream.Writable
// without errors.

const writable = new TestWritable();
writable.on('error', common.mustNotCall());
writable.on('finish', common.mustCall());
writable.on('close', common.mustCall());

const writableStream = newWritableStreamFromStreamWritable(writable);

writableStream.close().then(common.mustCall(() => {
assert(writable.destroyed);
}));
}

{
// Aborting the WritableStream errors the stream.Writable

const error = new Error('boom');
const writable = new TestWritable();
writable.on('error', common.mustCall((reason) => {
assert.strictEqual(reason, error);
}));
writable.on('finish', common.mustNotCall());
writable.on('close', common.mustCall());

const writableStream = newWritableStreamFromStreamWritable(writable);

writableStream.abort(error).then(common.mustCall(() => {
assert(writable.destroyed);
}));
}

{
// Destroying the stream.Writable prematurely errors the
// WritableStream

const error = new Error('boom');
const writable = new TestWritable();

const writableStream = newWritableStreamFromStreamWritable(writable);
assert.rejects(writableStream.close(), error);
writable.destroy(error);
}

{
// Ending the stream.Writable directly errors the WritableStream
const writable = new TestWritable();

const writableStream = newWritableStreamFromStreamWritable(writable);

assert.rejects(writableStream.close(), {
code: 'ERR_INVALID_STATE'
});

writable.end();
}

{
const writable = new TestWritable();
const writableStream = newWritableStreamFromStreamWritable(writable);
const writer = writableStream.getWriter();
const ec = new TextEncoder();
writer.write(ec.encode('hello')).then(common.mustCall(() => {
assert.strictEqual(writable.chunks.length, 1);
assert.deepStrictEqual(
writable.chunks[0],
{
chunk: Buffer.from('hello'),
encoding: 'buffer'
});
}));
}

{
const writable = new TestWritable(true);

writable.on('error', common.mustNotCall());
writable.on('close', common.mustCall());
writable.on('finish', common.mustCall());

const writableStream = newWritableStreamFromStreamWritable(writable);
const writer = writableStream.getWriter();
const ec = new TextEncoder();
writer.write(ec.encode('hello')).then(common.mustCall(() => {
assert.strictEqual(writable.chunks.length, 1);
assert.deepStrictEqual(
writable.chunks[0],
{
chunk: Buffer.from('hello'),
encoding: 'buffer'
});
writer.close().then(common.mustCall());
}));
}

0 comments on commit f359cb3

Please sign in to comment.