/
test-stream-compose-operator.js
127 lines (110 loc) Β· 2.86 KB
/
test-stream-compose-operator.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
'use strict';
const common = require('../common');
const {
Readable, Transform,
} = require('stream');
const assert = require('assert');
{
// with async generator
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
let str = '';
for await (const chunk of stream) {
str += chunk;
if (str.length === 2) {
yield str;
str = '';
}
}
});
const result = ['ab', 'cd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// With Transformer
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk);
}, 4)
}));
const result = ['a', 'b', 'c', 'd'];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}
{
// Throwing an error during `compose` (before waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
throw new Error('boom');
});
assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, /boom/).then(common.mustCall());
}
{
// Throwing an error during `compose` (when waiting for data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
for await (const chunk of stream) {
if (chunk === 3) {
throw new Error('boom');
}
yield chunk;
}
});
assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}
{
// Throwing an error during `compose` (after finishing all readable data)
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
// eslint-disable-next-line no-unused-vars,no-empty
for await (const chunk of stream) {
}
throw new Error('boom');
});
assert.rejects(
stream.toArray(),
/boom/,
).then(common.mustCall());
}
{
// AbortSignal
const ac = new AbortController();
const stream = Readable.from([1, 2, 3, 4, 5])
.compose(async function *(source) {
// Should not reach here
for await (const chunk of source) {
yield chunk;
}
}, { signal: ac.signal });
ac.abort();
assert.rejects(async () => {
for await (const item of stream) {
assert.fail('should not reach here, got ' + item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
}
{
assert.throws(
() => Readable.from(['a']).compose(Readable.from(['b'])),
{ code: 'ERR_INVALID_ARG_VALUE' }
);
}
{
assert.throws(
() => Readable.from(['a']).compose(),
{ code: 'ERR_INVALID_ARG_TYPE' }
);
}