Skip to content

Commit

Permalink
stream: add isErrored helper
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Dec 9, 2021
1 parent 1fa507f commit 46092fb
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 2 deletions.
13 changes: 13 additions & 0 deletions doc/api/stream.md
Expand Up @@ -2225,6 +2225,19 @@ added: v16.8.0

Returns whether the stream has been read from or cancelled.

### `stream.Readable.isErrored(stream)`

<!-- YAML
added: v16.8.0
-->

> Stability: 1 - Experimental
* `stream` {stream.Readable|ReadableStream}
* Returns: `boolean`

Returns whether the stream has been errored.

### `stream.Readable.toWeb(streamReadable)`

<!-- YAML
Expand Down
14 changes: 14 additions & 0 deletions lib/internal/streams/utils.js
Expand Up @@ -245,10 +245,24 @@ function isDisturbed(stream) {
));
}

function isErrored(stream) {
return !!(stream && (
stream.readableErrored ||
stream.writableErrored ||
stream._readableState?.errorEmitted ||
stream._writableState?.errorEmitted ||
stream._readableState?.errored ||
stream._writableState?.errored ||
stream[kIsErrored]
));
}

module.exports = {
kDestroyed,
isDisturbed,
isErrored,
kIsDisturbed,
kIsErrored,
isClosed,
isDestroyed,
isDuplexNodeStream,
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/webstreams/readablestream.js
Expand Up @@ -241,6 +241,10 @@ class ReadableStream {
return this[kState].disturbed;
}

get [kIsErrored]() {
return this[kState].state === 'errored';
}

/**
* @readonly
* @type {boolean}
Expand Down
1 change: 1 addition & 0 deletions lib/stream.js
Expand Up @@ -39,6 +39,7 @@ const promises = require('stream/promises');

const Stream = module.exports = require('internal/streams/legacy').Stream;
Stream.isDisturbed = require('internal/streams/utils').isDisturbed;
Stream.isErrored = require('internal/streams/utils').isErrored;
Stream.Readable = require('internal/streams/readable');
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Expand Down
7 changes: 5 additions & 2 deletions test/parallel/test-stream-readable-didRead.js
@@ -1,15 +1,18 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { isDisturbed, Readable } = require('stream');
const { isDisturbed, isErrored, Readable } = require('stream');

function noop() {}

function check(readable, data, fn) {
assert.strictEqual(readable.readableDidRead, false);
assert.strictEqual(isDisturbed(readable), false);
assert.strictEqual(isErrored(readable), false);
if (data === -1) {
readable.on('error', common.mustCall());
readable.on('error', common.mustCall(() => {
assert.strictEqual(isErrored(readable), true)
}));
readable.on('data', common.mustNotCall());
readable.on('end', common.mustNotCall());
} else {
Expand Down
17 changes: 17 additions & 0 deletions test/parallel/test-whatwg-readablestream.js
Expand Up @@ -1572,3 +1572,20 @@ class Source {
isDisturbed(stream, true);
})().then(common.mustCall());
}


{
const stream = new ReadableStream({
start(controller) {
controller.error(new Error());
},
pull: common.mustNotCall(),
});

const reader = stream.getReader();
(async () => {
isErrored(stream, false);
await reader.read();
isErrored(stream, true);
})().then(common.mustCall());
}

0 comments on commit 46092fb

Please sign in to comment.