diff --git a/lib/internal/blob.js b/lib/internal/blob.js index 0dfa0f47911cf6..2f88f1bbda333d 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -123,7 +123,9 @@ function getSource(source, endings) { class Blob { /** * @typedef {string|ArrayBuffer|ArrayBufferView|Blob} SourcePart - * + */ + + /** * @param {SourcePart[]} [sources] * @param {{ * endings? : string, diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js new file mode 100644 index 00000000000000..a81c173e4714e9 --- /dev/null +++ b/lib/internal/webstreams/adapters.js @@ -0,0 +1,955 @@ +'use strict'; + +const { + ArrayPrototypeMap, + PromiseAll, + PromisePrototypeThen, + PromisePrototypeFinally, + PromiseResolve, + Uint8Array, +} = primordials; + +const { + ReadableStream, + isReadableStream, +} = require('internal/webstreams/readablestream'); + +const { + WritableStream, + isWritableStream, +} = require('internal/webstreams/writablestream'); + +const { + CountQueuingStrategy, +} = require('internal/webstreams/queuingstrategies'); + +const { + Writable, + Readable, + Duplex, + destroy, +} = require('stream'); + +const { + isDestroyed, + isReadable, + isReadableEnded, + isWritable, + isWritableEnded, +} = require('internal/streams/utils'); + +const { + Buffer, +} = require('buffer'); + +const { + errnoException, + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_ARG_VALUE, + ERR_INVALID_STATE, + ERR_STREAM_PREMATURE_CLOSE, + }, + AbortError, +} = require('internal/errors'); + +const { + createDeferredPromise, +} = require('internal/util'); + +const { + validateBoolean, + validateObject, +} = require('internal/validators'); + +const { + WriteWrap, + ShutdownWrap, + kReadBytesOrError, + kLastWriteWasAsync, + streamBaseState, +} = internalBinding('stream_wrap'); + +const finished = require('internal/streams/end-of-stream'); + +const { UV_EOF } = internalBinding('uv'); + +/** + * @typedef {import('../../stream').Writable} Writable + * @typedef {import('../../stream').Readable} Readable + * @typedef {import('./writablestream').WritableStream} WritableStream + * @typedef {import('./readablestream').ReadableStream} ReadableStream + */ + +/** + * @typedef {import('../abort_controller').AbortSignal} AbortSignal + */ + +/** + * @param {Writable} streamWritable + * @returns {WritableStream} + */ +function newWritableStreamFromStreamWritable(streamWritable) { + // Not using the internal/streams/utils isWritableNodeStream utility + // here because it will return false if streamWritable is a Duplex + // whose writable option is false. For a Duplex that is not writable, + // we want it to pass this check but return a closed WritableStream. + if (typeof streamWritable?._writableState !== 'object') { + throw new ERR_INVALID_ARG_TYPE( + 'streamWritable', + 'stream.Writable', + streamWritable); + } + + if (isDestroyed(streamWritable) || !isWritable(streamWritable)) { + const writable = new WritableStream(); + writable.close(); + return writable; + } + + const highWaterMark = streamWritable.writableHighWaterMark; + const strategy = + streamWritable.writableObjectMode ? + new CountQueuingStrategy({ highWaterMark }) : + { highWaterMark }; + + let controller; + let backpressurePromise; + let closed; + + function onDrain() { + if (backpressurePromise !== undefined) + backpressurePromise.resolve(); + } + + const cleanup = finished(streamWritable, (error) => { + if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') { + const err = new AbortError(undefined, { cause: error }); + error = err; + } + + cleanup(); + // This is a protection against non-standard, legacy streams + // that happen to emit an error event again after finished is called. + streamWritable.on('error', () => {}); + if (error != null) { + if (backpressurePromise !== undefined) + backpressurePromise.reject(error); + // If closed is not undefined, the error is happening + // after the WritableStream close has already started. + // We need to reject it here. + if (closed !== undefined) { + closed.reject(error); + closed = undefined; + } + controller.error(error); + controller = undefined; + return; + } + + if (closed !== undefined) { + closed.resolve(); + closed = undefined; + return; + } + controller.error(new AbortError()); + controller = undefined; + }); + + streamWritable.on('drain', onDrain); + + return new WritableStream({ + start(c) { controller = c; }, + + async write(chunk) { + if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) { + backpressurePromise = createDeferredPromise(); + return PromisePrototypeFinally( + backpressurePromise.promise, () => { + backpressurePromise = undefined; + }); + } + }, + + abort(reason) { + destroy(streamWritable, reason); + }, + + close() { + if (closed === undefined && !isWritableEnded(streamWritable)) { + closed = createDeferredPromise(); + streamWritable.end(); + return closed.promise; + } + + controller = undefined; + return PromiseResolve(); + }, + }, strategy); +} + +/** + * @param {WritableStream} writableStream + * @param {{ + * decodeStrings? : boolean, + * highWaterMark? : number, + * objectMode? : boolean, + * signal? : AbortSignal, + * }} [options] + * @returns {Writable} + */ +function newStreamWritableFromWritableStream(writableStream, options = {}) { + if (!isWritableStream(writableStream)) { + throw new ERR_INVALID_ARG_TYPE( + 'writableStream', + 'WritableStream', + writableStream); + } + + validateObject(options, 'options'); + const { + highWaterMark, + decodeStrings = true, + objectMode = false, + signal, + } = options; + + validateBoolean(objectMode, 'options.objectMode'); + validateBoolean(decodeStrings, 'options.decodeStrings'); + + const writer = writableStream.getWriter(); + let closed = false; + + const writable = new Writable({ + highWaterMark, + objectMode, + decodeStrings, + signal, + + writev(chunks, callback) { + function done(error) { + try { + callback(error); + } catch (error) { + // In a next tick because this is happening within + // a promise context, and if there are any errors + // thrown we don't want those to cause an unhandled + // rejection. Let's just escape the promise and + // handle it separately. + process.nextTick(() => destroy(writable, error)); + } + } + + PromisePrototypeThen( + writer.ready, + () => { + return PromisePrototypeThen( + PromiseAll( + ArrayPrototypeMap( + chunks, + (chunk) => writer.write(chunk))), + done, + done); + }, + done); + }, + + write(chunk, encoding, callback) { + if (typeof chunk === 'string' && decodeStrings && !objectMode) { + chunk = Buffer.from(chunk, encoding); + chunk = new Uint8Array( + chunk.buffer, + chunk.byteOffset, + chunk.byteLength); + } + + function done(error) { + try { + callback(error); + } catch (error) { + destroy(writable, error); + } + } + + PromisePrototypeThen( + writer.ready, + () => { + return PromisePrototypeThen( + writer.write(chunk), + done, + done); + }, + done); + }, + + destroy(error, callback) { + function done() { + try { + callback(error); + } catch (error) { + // In a next tick because this is happening within + // a promise context, and if there are any errors + // thrown we don't want those to cause an unhandled + // rejection. Let's just escape the promise and + // handle it separately. + process.nextTick(() => { throw error; }); + } + } + + if (!closed) { + if (error != null) { + PromisePrototypeThen( + writer.abort(error), + done, + done); + } else { + PromisePrototypeThen( + writer.close(), + done, + done); + } + return; + } + + done(); + }, + + final(callback) { + function done(error) { + try { + callback(error); + } catch (error) { + // In a next tick because this is happening within + // a promise context, and if there are any errors + // thrown we don't want those to cause an unhandled + // rejection. Let's just escape the promise and + // handle it separately. + process.nextTick(() => destroy(writable, error)); + } + } + + if (!closed) { + PromisePrototypeThen( + writer.close(), + done, + done); + } + }, + }); + + PromisePrototypeThen( + writer.closed, + () => { + // If the WritableStream closes before the stream.Writable has been + // ended, we signal an error on the stream.Writable. + closed = true; + if (!isWritableEnded(writable)) + destroy(writable, new ERR_STREAM_PREMATURE_CLOSE()); + }, + (error) => { + // If the WritableStream errors before the stream.Writable has been + // destroyed, signal an error on the stream.Writable. + closed = true; + destroy(writable, error); + }); + + return writable; +} + +/** + * @param {Readable} streamReadable + * @returns {ReadableStream} + */ +function newReadableStreamFromStreamReadable(streamReadable) { + // Not using the internal/streams/utils isReadableNodeStream utility + // here because it will return false if streamReadable is a Duplex + // whose readable option is false. For a Duplex that is not readable, + // we want it to pass this check but return a closed ReadableStream. + if (typeof streamReadable?._readableState !== 'object') { + throw new ERR_INVALID_ARG_TYPE( + 'streamReadable', + 'stream.Readable', + streamReadable); + } + + if (isDestroyed(streamReadable) || !isReadable(streamReadable)) { + const readable = new ReadableStream(); + readable.cancel(); + return readable; + } + + const objectMode = streamReadable.readableObjectMode; + const highWaterMark = streamReadable.readableHighWaterMark; + // When not running in objectMode explicitly, we just fall + // back to a minimal strategy that just specifies the highWaterMark + // and no size algorithm. Using a ByteLengthQueuingStrategy here + // is unnecessary. + const strategy = + objectMode ? + new CountQueuingStrategy({ highWaterMark }) : + { highWaterMark }; + + let controller; + + function onData(chunk) { + // Copy the Buffer to detach it from the pool. + if (Buffer.isBuffer(chunk) && !objectMode) + chunk = new Uint8Array(chunk); + controller.enqueue(chunk); + if (controller.desiredSize <= 0) + streamReadable.pause(); + } + + streamReadable.pause(); + + const cleanup = finished(streamReadable, (error) => { + if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') { + const err = new AbortError(undefined, { cause: error }); + error = err; + } + + cleanup(); + // This is a protection against non-standard, legacy streams + // that happen to emit an error event again after finished is called. + streamReadable.on('error', () => {}); + if (error) + return controller.error(error); + controller.close(); + }); + + streamReadable.on('data', onData); + + return new ReadableStream({ + start(c) { controller = c; }, + + pull() { streamReadable.resume(); }, + + cancel(reason) { + destroy(streamReadable, reason); + }, + }, strategy); +} + +/** + * @param {ReadableStream} readableStream + * @param {{ + * highWaterMark? : number, + * encoding? : string, + * objectMode? : boolean, + * signal? : AbortSignal, + * }} [options] + * @returns {Readable} + */ +function newStreamReadableFromReadableStream(readableStream, options = {}) { + if (!isReadableStream(readableStream)) { + throw new ERR_INVALID_ARG_TYPE( + 'readableStream', + 'ReadableStream', + readableStream); + } + + validateObject(options, 'options'); + const { + highWaterMark, + encoding, + objectMode = false, + signal, + } = options; + + if (encoding !== undefined && !Buffer.isEncoding(encoding)) + throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding'); + validateBoolean(objectMode, 'options.objectMode'); + + const reader = readableStream.getReader(); + let closed = false; + + const readable = new Readable({ + objectMode, + highWaterMark, + encoding, + signal, + + read() { + PromisePrototypeThen( + reader.read(), + (chunk) => { + if (chunk.done) { + // Value should always be undefined here. + readable.push(null); + } else { + readable.push(chunk.value); + } + }, + (error) => destroy(readable, error)); + }, + + destroy(error, callback) { + function done() { + try { + callback(error); + } catch (error) { + // In a next tick because this is happening within + // a promise context, and if there are any errors + // thrown we don't want those to cause an unhandled + // rejection. Let's just escape the promise and + // handle it separately. + process.nextTick(() => { throw error; }); + } + } + + if (!closed) { + PromisePrototypeThen( + reader.cancel(error), + done, + done); + return; + } + done(); + }, + }); + + PromisePrototypeThen( + reader.closed, + () => { + closed = true; + if (!isReadableEnded(readable)) + readable.push(null); + }, + (error) => { + closed = true; + destroy(readable, error); + }); + + return readable; +} + +/** + * @typedef {import('./readablestream').ReadableWritablePair + * } ReadableWritablePair + * @typedef {import('../../stream').Duplex} Duplex + */ + +/** + * @param {Duplex} duplex + * @returns {ReadableWritablePair} + */ +function newReadableWritablePairFromDuplex(duplex) { + // Not using the internal/streams/utils isWritableNodeStream and + // isReadableNodestream utilities here because they will return false + // if the duplex was created with writable or readable options set to + // false. Instead, we'll check the readable and writable state after + // and return closed WritableStream or closed ReadableStream as + // necessary. + if (typeof duplex?._writableState !== 'object' || + typeof duplex?._readableState !== 'object') { + throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex); + } + + if (isDestroyed(duplex)) { + const writable = new WritableStream(); + const readable = new ReadableStream(); + writable.close(); + readable.cancel(); + return { readable, writable }; + } + + const writable = + isWritable(duplex) ? + newWritableStreamFromStreamWritable(duplex) : + new WritableStream(); + + if (!isWritable(duplex)) + writable.close(); + + const readable = + isReadable(duplex) ? + newReadableStreamFromStreamReadable(duplex) : + new ReadableStream(); + + if (!isReadable(duplex)) + readable.cancel(); + + return { writable, readable }; +} + +/** + * @param {ReadableWritablePair} pair + * @param {{ + * allowHalfOpen? : boolean, + * decodeStrings? : boolean, + * encoding? : string, + * highWaterMark? : number, + * objectMode? : boolean, + * signal? : AbortSignal, + * }} [options] + * @returns {Duplex} + */ +function newStreamDuplexFromReadableWritablePair(pair = {}, options = {}) { + validateObject(pair, 'pair'); + const { + readable: readableStream, + writable: writableStream, + } = pair; + + if (!isReadableStream(readableStream)) { + throw new ERR_INVALID_ARG_TYPE( + 'pair.readable', + 'ReadableStream', + readableStream); + } + if (!isWritableStream(writableStream)) { + throw new ERR_INVALID_ARG_TYPE( + 'pair.writable', + 'WritableStream', + writableStream); + } + + validateObject(options, 'options'); + const { + allowHalfOpen = false, + objectMode = false, + encoding, + decodeStrings = true, + highWaterMark, + signal, + } = options; + + validateBoolean(objectMode, 'options.objectMode'); + if (encoding !== undefined && !Buffer.isEncoding(encoding)) + throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding'); + + const writer = writableStream.getWriter(); + const reader = readableStream.getReader(); + let writableClosed = false; + let readableClosed = false; + + const duplex = new Duplex({ + allowHalfOpen, + highWaterMark, + objectMode, + encoding, + decodeStrings, + signal, + + writev(chunks, callback) { + function done(error) { + try { + callback(error); + } catch (error) { + // In a next tick because this is happening within + // a promise context, and if there are any errors + // thrown we don't want those to cause an unhandled + // rejection. Let's just escape the promise and + // handle it separately. + process.nextTick(() => destroy(duplex, error)); + } + } + + PromisePrototypeThen( + writer.ready, + () => { + return PromisePrototypeThen( + PromiseAll( + ArrayPrototypeMap( + chunks, + (chunk) => writer.write(chunk))), + done, + done); + }, + done); + }, + + write(chunk, encoding, callback) { + if (typeof chunk === 'string' && decodeStrings && !objectMode) { + chunk = Buffer.from(chunk, encoding); + chunk = new Uint8Array( + chunk.buffer, + chunk.byteOffset, + chunk.byteLength); + } + + function done(error) { + try { + callback(error); + } catch (error) { + destroy(duplex, error); + } + } + + PromisePrototypeThen( + writer.ready, + () => { + return PromisePrototypeThen( + writer.write(chunk), + done, + done); + }, + done); + }, + + final(callback) { + function done(error) { + try { + callback(error); + } catch (error) { + // In a next tick because this is happening within + // a promise context, and if there are any errors + // thrown we don't want those to cause an unhandled + // rejection. Let's just escape the promise and + // handle it separately. + process.nextTick(() => destroy(duplex, error)); + } + } + + if (!writableClosed) { + PromisePrototypeThen( + writer.close(), + done, + done); + } + }, + + read() { + PromisePrototypeThen( + reader.read(), + (chunk) => { + if (chunk.done) { + duplex.push(null); + } else { + duplex.push(chunk.value); + } + }, + (error) => destroy(duplex, error)); + }, + + destroy(error, callback) { + function done() { + try { + callback(error); + } catch (error) { + // In a next tick because this is happening within + // a promise context, and if there are any errors + // thrown we don't want those to cause an unhandled + // rejection. Let's just escape the promise and + // handle it separately. + process.nextTick(() => { throw error; }); + } + } + + async function closeWriter() { + if (!writableClosed) + await writer.abort(error); + } + + async function closeReader() { + if (!readableClosed) + await reader.cancel(error); + } + + if (!writableClosed || !readableClosed) { + PromisePrototypeThen( + PromiseAll([ + closeWriter(), + closeReader(), + ]), + done, + done); + return; + } + + done(); + }, + }); + + PromisePrototypeThen( + writer.closed, + () => { + writableClosed = true; + if (!isWritableEnded(duplex)) + destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE()); + }, + (error) => { + writableClosed = true; + readableClosed = true; + destroy(duplex, error); + }); + + PromisePrototypeThen( + reader.closed, + () => { + readableClosed = true; + if (!isReadableEnded(duplex)) + duplex.push(null); + }, + (error) => { + writableClosed = true; + readableClosed = true; + destroy(duplex, error); + }); + + return duplex; +} + +/** + * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy + * @typedef {{}} StreamBase + * @param {StreamBase} streamBase + * @param {QueuingStrategy} strategy + * @returns {WritableStream} + */ +function newWritableStreamFromStreamBase(streamBase, strategy) { + validateObject(streamBase, 'streamBase'); + + let current; + + function createWriteWrap(controller, promise) { + const req = new WriteWrap(); + req.handle = streamBase; + req.oncomplete = onWriteComplete; + req.async = false; + req.bytes = 0; + req.buffer = null; + req.controller = controller; + req.promise = promise; + return req; + } + + function onWriteComplete(status) { + if (status < 0) { + const error = errnoException(status, 'write', this.error); + this.promise.reject(error); + this.controller.error(error); + return; + } + this.promise.resolve(); + } + + function doWrite(chunk, controller) { + const promise = createDeferredPromise(); + let ret; + let req; + try { + req = createWriteWrap(controller, promise); + ret = streamBase.writeBuffer(req, chunk); + if (streamBaseState[kLastWriteWasAsync]) + req.buffer = chunk; + req.async = !!streamBaseState[kLastWriteWasAsync]; + } catch (error) { + promise.reject(error); + } + + if (ret !== 0) + promise.reject(errnoException(ret, 'write', req)); + else if (!req.async) + promise.resolve(); + + return promise.promise; + } + + return new WritableStream({ + write(chunk, controller) { + current = current !== undefined ? + PromisePrototypeThen( + current, + () => doWrite(chunk, controller), + (error) => controller.error(error)) : + doWrite(chunk, controller); + return current; + }, + + close() { + const promise = createDeferredPromise(); + const req = new ShutdownWrap(); + req.oncomplete = () => promise.resolve(); + const err = streamBase.shutdown(req); + if (err === 1) + promise.resolve(); + return promise.promise; + }, + }, strategy); +} + +/** + * @param {StreamBase} streamBase + * @param {QueuingStrategy} strategy + * @returns {ReadableStream} + */ +function newReadableStreamFromStreamBase(streamBase, strategy, options = {}) { + validateObject(streamBase, 'streamBase'); + validateObject(options, 'options'); + + const { + ondone = () => {}, + } = options; + + if (typeof streamBase.onread === 'function') + throw new ERR_INVALID_STATE('StreamBase already has a consumer'); + + if (typeof ondone !== 'function') + throw new ERR_INVALID_ARG_TYPE('options.ondone', 'Function', ondone); + + let controller; + + streamBase.onread = (arrayBuffer) => { + const nread = streamBaseState[kReadBytesOrError]; + + if (nread === 0) + return; + + try { + if (nread === UV_EOF) { + controller.close(); + streamBase.readStop(); + try { + ondone(); + } catch (error) { + controller.error(error); + } + return; + } + + controller.enqueue(arrayBuffer); + + if (controller.desiredSize <= 0) + streamBase.readStop(); + } catch (error) { + controller.error(error); + streamBase.readStop(); + } + }; + + return new ReadableStream({ + start(c) { controller = c; }, + + pull() { + streamBase.readStart(); + }, + + cancel() { + const promise = createDeferredPromise(); + try { + ondone(); + } catch (error) { + promise.reject(error); + return promise.promise; + } + const req = new ShutdownWrap(); + req.oncomplete = () => promise.resolve(); + const err = streamBase.shutdown(req); + if (err === 1) + promise.resolve(); + return promise.promise; + }, + }, strategy); +} + +module.exports = { + newWritableStreamFromStreamWritable, + newReadableStreamFromStreamReadable, + newStreamWritableFromWritableStream, + newStreamReadableFromReadableStream, + newReadableWritablePairFromDuplex, + newStreamDuplexFromReadableWritablePair, + newWritableStreamFromStreamBase, + newReadableStreamFromStreamBase, +}; diff --git a/lib/internal/webstreams/queuingstrategies.js b/lib/internal/webstreams/queuingstrategies.js index 2db51b92a7f502..cd8ec2bf35bfd0 100644 --- a/lib/internal/webstreams/queuingstrategies.js +++ b/lib/internal/webstreams/queuingstrategies.js @@ -38,7 +38,9 @@ const isCountQueuingStrategy = * @callback QueuingStrategySize * @param {any} chunk * @returns {number} - * + */ + +/** * @typedef {{ * highWaterMark : number, * size? : QueuingStrategySize, diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index b4a31d146db2e0..cdabebefe999f0 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -140,37 +140,53 @@ const kPull = Symbol('kPull'); * @typedef {import('./queuingstrategies').QueuingStrategySize * } QueuingStrategySize * @typedef {import('./writablestream').WritableStream} WritableStream - * + */ + +/** * @typedef {ReadableStreamDefaultController | ReadableByteStreamController * } ReadableStreamController - * + */ + +/** * @typedef {ReadableStreamDefaultReader | ReadableStreamBYOBReader * } ReadableStreamReader - * + */ + +/** * @callback UnderlyingSourceStartCallback * @param {ReadableStreamController} controller * @returns { any | Promise } - * + */ + +/** * @callback UnderlyingSourcePullCallback * @param {ReadableStreamController} controller * @returns { Promise } - * + */ + +/** * @callback UnderlyingSourceCancelCallback * @param {any} reason * @returns { Promise } - * + */ + +/** * @typedef {{ * readable: ReadableStream, * writable: WritableStream, * }} ReadableWritablePair - * + */ + +/** * @typedef {{ * preventClose? : boolean, * preventAbort? : boolean, * preventCancel? : boolean, * signal? : AbortSignal, * }} StreamPipeOptions - * + */ + +/** * @typedef {{ * start? : UnderlyingSourceStartCallback, * pull? : UnderlyingSourcePullCallback, @@ -178,7 +194,6 @@ const kPull = Symbol('kPull'); * type? : "bytes", * autoAllocateChunkSize? : number * }} UnderlyingSource - * */ class ReadableStream { diff --git a/lib/internal/webstreams/transformstream.js b/lib/internal/webstreams/transformstream.js index b4e690daa98c4a..e3cfd0362a26a5 100644 --- a/lib/internal/webstreams/transformstream.js +++ b/lib/internal/webstreams/transformstream.js @@ -70,19 +70,27 @@ const assert = require('internal/assert'); * } QueuingStrategy * @typedef {import('./queuingstrategies').QueuingStrategySize * } QueuingStrategySize - * + */ + +/** * @callback TransformerStartCallback * @param {TransformStreamDefaultController} controller; - * + */ + +/** * @callback TransformerFlushCallback * @param {TransformStreamDefaultController} controller; * @returns {Promise} - * + */ + +/** * @callback TransformerTransformCallback * @param {any} chunk * @param {TransformStreamDefaultController} controller * @returns {Promise} - * + */ + +/** * @typedef {{ * start? : TransformerStartCallback, * transform? : TransformerTransformCallback, diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index dba7560c549a9a..fa0d3b78e3e1ea 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -83,22 +83,32 @@ const kError = Symbol('kError'); * } QueuingStrategy * @typedef {import('./queuingstrategies').QueuingStrategySize * } QueuingStrategySize - * + */ + +/** * @callback UnderlyingSinkStartCallback * @param {WritableStreamDefaultController} controller - * + */ + +/** * @callback UnderlyingSinkWriteCallback * @param {any} chunk * @param {WritableStreamDefaultController} controller * @returns {Promise} - * + */ + +/** * @callback UnderlyingSinkCloseCallback * @returns {Promise} - * + */ + +/** * @callback UnderlyingSinkAbortCallback * @param {any} reason * @returns {Promise} - * + */ + +/** * @typedef {{ * start? : UnderlyingSinkStartCallback, * write? : UnderlyingSinkWriteCallback, diff --git a/tools/eslint-rules/crypto-check.js b/tools/eslint-rules/crypto-check.js index def002ef4c6eda..39d29bba6e0a3d 100644 --- a/tools/eslint-rules/crypto-check.js +++ b/tools/eslint-rules/crypto-check.js @@ -1,9 +1,6 @@ /** * @fileoverview Check that common.hasCrypto is used if crypto, tls, * https, or http2 modules are required. - * - * This rule can be ignored using // eslint-disable-line crypto-check - * * @author Daniel Bevenius */ 'use strict';