/
test-stream-flatMap.js
131 lines (120 loc) Β· 3.53 KB
/
test-stream-flatMap.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
'use strict';
const common = require('../common');
const fixtures = require('../common/fixtures');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');
const { createReadStream } = require('fs');
function oneTo5() {
return Readable.from([1, 2, 3, 4, 5]);
}
{
// flatMap works on synchronous streams with a synchronous mapper
(async () => {
assert.deepStrictEqual(
await oneTo5().flatMap((x) => [x + x]).toArray(),
[2, 4, 6, 8, 10]
);
assert.deepStrictEqual(
await oneTo5().flatMap(() => []).toArray(),
[]
);
assert.deepStrictEqual(
await oneTo5().flatMap((x) => [x, x]).toArray(),
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);
})().then(common.mustCall());
}
{
// flatMap works on sync/async streams with an asynchronous mapper
(async () => {
assert.deepStrictEqual(
await oneTo5().flatMap(async (x) => [x, x]).toArray(),
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);
const asyncOneTo5 = oneTo5().map(async (x) => x);
assert.deepStrictEqual(
await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(),
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);
})().then(common.mustCall());
}
{
// flatMap works on a stream where mapping returns a stream
(async () => {
const result = await oneTo5().flatMap(async (x) => {
return Readable.from([x, x]);
}).toArray();
assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]);
})().then(common.mustCall());
// flatMap works on an objectMode stream where mappign returns a stream
(async () => {
const result = await oneTo5().flatMap(() => {
return createReadStream(fixtures.path('x.txt'));
}).toArray();
// The resultant stream is in object mode so toArray shouldn't flatten
assert.strictEqual(result.length, 5);
assert.deepStrictEqual(
Buffer.concat(result).toString(),
'xyz\n'.repeat(5)
);
})().then(common.mustCall());
}
{
// Concurrency + AbortSignal
const ac = new AbortController();
const stream = oneTo5().flatMap(common.mustCall(async (_, { signal }) => {
await setTimeout(100, { signal });
}, 2), { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
setImmediate(() => {
ac.abort();
});
}
{
// Already aborted AbortSignal
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
await setTimeout(100, { signal });
}), { signal: AbortSignal.abort() });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());
}
{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).flatMap(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).flatMap((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Readable
const stream = oneTo5().flatMap((x) => x);
assert.strictEqual(stream.readable, true);
}