Skip to content

Commit

Permalink
stream: support abort signal
Browse files Browse the repository at this point in the history
PR-URL: #36061
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
benjamingr authored and nodejs-github-bot committed Dec 7, 2020
1 parent 5122456 commit 5bd1eec
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 13 deletions.
54 changes: 52 additions & 2 deletions doc/api/stream.md
Expand Up @@ -45,8 +45,8 @@ There are four fundamental stream types within Node.js:
is written and read (for example, [`zlib.createDeflate()`][]).

Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][] and
[`stream.Readable.from()`][].
[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][]
and [`stream.addAbortSignal()`][].

### Streams Promises API
<!-- YAML
Expand Down Expand Up @@ -1799,6 +1799,55 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
the strings or buffers be iterated to match the other streams semantics
for performance reasons.

### `stream.addAbortSignal(signal, stream)`
<!-- YAML
added: REPLACEME
-->
* `signal` {AbortSignal} A signal representing possible cancellation
* `stream` {Stream} a stream to attach a signal to

Attaches an AbortSignal to a readable or writeable stream. This lets code
control stream destruction using an `AbortController`.

Calling `abort` on the `AbortController` corresponding to the passed
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
on the stream.

```js
const fs = require('fs');

const controller = new AbortController();
const read = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json'))
);
// Later, abort the operation closing the stream
controller.abort();
```

Or using an `AbortSignal` with a readable stream as an async iterable:

```js
const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json'))
);
(async () => {
try {
for await (const chunk of stream) {
await process(chunk);
}
} catch (e) {
if (e.name === 'AbortError') {
// The operation was cancelled
} else {
throw e;
}
}
})();
```
## API for stream implementers

<!--type=misc-->
Expand Down Expand Up @@ -3123,6 +3172,7 @@ contain multi-byte characters.
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
[`stream.pipe()`]: #stream_readable_pipe_destination_options
[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback
[`stream.addAbortSignal()`]: #stream_stream_addabortsignal_signal_stream
[`stream.uncork()`]: #stream_writable_uncork
[`stream.unpipe()`]: #stream_readable_unpipe_destination
[`stream.wrap()`]: #stream_readable_wrap_stream
Expand Down
12 changes: 4 additions & 8 deletions lib/_http_client.js
Expand Up @@ -51,7 +51,7 @@ const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { URL, urlToOptions, searchParamsSymbol } = require('internal/url');
const { kOutHeaders, kNeedDrain } = require('internal/http');
const { AbortError, connResetException, codes } = require('internal/errors');
const { connResetException, codes } = require('internal/errors');
const {
ERR_HTTP_HEADERS_SENT,
ERR_INVALID_ARG_TYPE,
Expand All @@ -61,14 +61,15 @@ const {
} = codes;
const {
validateInteger,
validateAbortSignal,
} = require('internal/validators');
const { getTimerDuration } = require('internal/timers');
const {
DTRACE_HTTP_CLIENT_REQUEST,
DTRACE_HTTP_CLIENT_RESPONSE
} = require('internal/dtrace');

const { addAbortSignal } = require('stream');

const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/;
const kError = Symbol('kError');

Expand Down Expand Up @@ -174,12 +175,7 @@ function ClientRequest(input, options, cb) {

const signal = options.signal;
if (signal) {
validateAbortSignal(signal, 'options.signal');
const listener = (e) => this.destroy(new AbortError());
signal.addEventListener('abort', listener);
this.once('close', () => {
signal.removeEventListener('abort', listener);
});
addAbortSignal(signal, this);
}
let method = options.method;
const methodIsString = (typeof method === 'string');
Expand Down
41 changes: 41 additions & 0 deletions lib/internal/streams/add-abort-signal.js
@@ -0,0 +1,41 @@
'use strict';

const {
AbortError,
codes,
} = require('internal/errors');

const eos = require('internal/streams/end-of-stream');
const { ERR_INVALID_ARG_TYPE } = codes;

// This method is inlined here for readable-stream
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
const validateAbortSignal = (signal, name) => {
if (signal !== undefined &&
(signal === null ||
typeof signal !== 'object' ||
!('aborted' in signal))) {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
}
};

function isStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

module.exports = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
if (!isStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
}
const onAbort = () => {
stream.destroy(new AbortError());
};
if (signal.aborted) {
onAbort();
} else {
signal.addEventListener('abort', onAbort);
eos(stream, () => signal.removeEventListener('abort', onAbort));
}
return stream;
};
1 change: 1 addition & 0 deletions lib/internal/streams/readable.js
Expand Up @@ -50,6 +50,7 @@ const {
getHighWaterMark,
getDefaultHighWaterMark
} = require('internal/streams/state');

const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_PUSH_AFTER_EOF,
Expand Down
1 change: 1 addition & 0 deletions lib/stream.js
Expand Up @@ -43,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Stream.PassThrough = require('internal/streams/passthrough');
Stream.pipeline = pipeline;
Stream.addAbortSignal = require('internal/streams/add-abort-signal');
Stream.finished = eos;

function lazyLoadPromises() {
Expand Down
1 change: 1 addition & 0 deletions node.gyp
Expand Up @@ -245,6 +245,7 @@
'lib/internal/worker/js_transferable.js',
'lib/internal/watchdog.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/add-abort-signal.js',
'lib/internal/streams/buffer_list.js',
'lib/internal/streams/duplexpair.js',
'lib/internal/streams/from.js',
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Expand Up @@ -78,6 +78,7 @@ const expectedModules = new Set([
'NativeModule internal/process/warning',
'NativeModule internal/querystring',
'NativeModule internal/source_map/source_map_cache',
'NativeModule internal/streams/add-abort-signal',
'NativeModule internal/streams/buffer_list',
'NativeModule internal/streams/destroy',
'NativeModule internal/streams/duplex',
Expand Down
32 changes: 31 additions & 1 deletion test/parallel/test-stream-pipeline.js
Expand Up @@ -8,7 +8,8 @@ const {
Transform,
pipeline,
PassThrough,
Duplex
Duplex,
addAbortSignal,
} = require('stream');
const assert = require('assert');
const http = require('http');
Expand Down Expand Up @@ -1261,3 +1262,32 @@ const net = require('net');
() => common.mustNotCall(),
);
}


{
const ac = new AbortController();
const r = Readable.from(async function* () {
for (let i = 0; i < 10; i++) {
await Promise.resolve();
yield String(i);
if (i === 5) {
ac.abort();
}
}
}());
let res = '';
const w = new Writable({
write(chunk, encoding, callback) {
res += chunk;
callback();
}
});
const cb = common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
assert.strictEqual(res, '012345');
assert.strictEqual(w.destroyed, true);
assert.strictEqual(r.destroyed, true);
assert.strictEqual(pipelined.destroyed, true);
});
const pipelined = addAbortSignal(ac.signal, pipeline([r, w], cb));
}
37 changes: 36 additions & 1 deletion test/parallel/test-stream-readable-destroy.js
@@ -1,7 +1,7 @@
'use strict';

const common = require('../common');
const { Readable } = require('stream');
const { Readable, addAbortSignal } = require('stream');
const assert = require('assert');

{
Expand Down Expand Up @@ -268,3 +268,38 @@ const assert = require('assert');
}));
read.resume();
}

{
const controller = new AbortController();
const read = addAbortSignal(controller.signal, new Readable({
read() {
this.push('asd');
},
}));

read.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
}));
controller.abort();
read.on('data', common.mustNotCall());
}

{
const controller = new AbortController();
const read = addAbortSignal(controller.signal, new Readable({
objectMode: true,
read() {
return false;
}
}));
read.push('asd');

read.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
}));
assert.rejects((async () => {
/* eslint-disable-next-line no-unused-vars */
for await (const chunk of read) {}
})(), /AbortError/);
setTimeout(() => controller.abort(), 0);
}
16 changes: 15 additions & 1 deletion test/parallel/test-stream-writable-destroy.js
@@ -1,7 +1,7 @@
'use strict';

const common = require('../common');
const { Writable } = require('stream');
const { Writable, addAbortSignal } = require('stream');
const assert = require('assert');

{
Expand Down Expand Up @@ -417,3 +417,17 @@ const assert = require('assert');
}));
write.write('asd');
}

{
const ac = new AbortController();
const write = addAbortSignal(ac.signal, new Writable({
write(chunk, enc, cb) { cb(); }
}));

write.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
assert.strictEqual(write.destroyed, true);
}));
write.write('asd');
ac.abort();
}

0 comments on commit 5bd1eec

Please sign in to comment.