forked from nodejs/node
/
test-stream-some-find-every.mjs
172 lines (155 loc) · 6.1 KB
/
test-stream-some-find-every.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import * as common from '../common/index.mjs';
import { setTimeout } from 'timers/promises';
import { Readable } from 'stream';
import assert from 'assert';
function oneTo5() {
return Readable.from([1, 2, 3, 4, 5]);
}
function oneTo5Async() {
return oneTo5().map(async (x) => {
await Promise.resolve();
return x;
});
}
{
// Some, find, and every work with a synchronous stream and predicate
assert.strictEqual(await oneTo5().some((x) => x > 3), true);
assert.strictEqual(await oneTo5().every((x) => x > 3), false);
assert.strictEqual(await oneTo5().find((x) => x > 3), 4);
assert.strictEqual(await oneTo5().some((x) => x > 6), false);
assert.strictEqual(await oneTo5().every((x) => x < 6), true);
assert.strictEqual(await oneTo5().find((x) => x > 6), undefined);
assert.strictEqual(await Readable.from([]).some(() => true), false);
assert.strictEqual(await Readable.from([]).every(() => true), true);
assert.strictEqual(await Readable.from([]).find(() => true), undefined);
}
{
// Some, find, and every work with an asynchronous stream and synchronous predicate
assert.strictEqual(await oneTo5Async().some((x) => x > 3), true);
assert.strictEqual(await oneTo5Async().every((x) => x > 3), false);
assert.strictEqual(await oneTo5Async().find((x) => x > 3), 4);
assert.strictEqual(await oneTo5Async().some((x) => x > 6), false);
assert.strictEqual(await oneTo5Async().every((x) => x < 6), true);
assert.strictEqual(await oneTo5Async().find((x) => x > 6), undefined);
}
{
// Some, find, and every work on synchronous streams with an asynchronous predicate
assert.strictEqual(await oneTo5().some(async (x) => x > 3), true);
assert.strictEqual(await oneTo5().every(async (x) => x > 3), false);
assert.strictEqual(await oneTo5().find(async (x) => x > 3), 4);
assert.strictEqual(await oneTo5().some(async (x) => x > 6), false);
assert.strictEqual(await oneTo5().every(async (x) => x < 6), true);
assert.strictEqual(await oneTo5().find(async (x) => x > 6), undefined);
}
{
// Some, find, and every work on asynchronous streams with an asynchronous predicate
assert.strictEqual(await oneTo5Async().some(async (x) => x > 3), true);
assert.strictEqual(await oneTo5Async().every(async (x) => x > 3), false);
assert.strictEqual(await oneTo5Async().find(async (x) => x > 3), 4);
assert.strictEqual(await oneTo5Async().some(async (x) => x > 6), false);
assert.strictEqual(await oneTo5Async().every(async (x) => x < 6), true);
assert.strictEqual(await oneTo5Async().find(async (x) => x > 6), undefined);
}
{
async function checkDestroyed(stream) {
await setTimeout();
assert.strictEqual(stream.destroyed, true);
}
{
// Some, find, and every short circuit
const someStream = oneTo5();
await someStream.some(common.mustCall((x) => x > 2, 3));
await checkDestroyed(someStream);
const everyStream = oneTo5();
await everyStream.every(common.mustCall((x) => x < 3, 3));
await checkDestroyed(everyStream);
const findStream = oneTo5();
await findStream.find(common.mustCall((x) => x > 1, 2));
await checkDestroyed(findStream);
// When short circuit isn't possible the whole stream is iterated
await oneTo5().some(common.mustCall(() => false, 5));
await oneTo5().every(common.mustCall(() => true, 5));
await oneTo5().find(common.mustCall(() => false, 5));
}
{
// Some, find, and every short circuit async stream/predicate
const someStream = oneTo5Async();
await someStream.some(common.mustCall(async (x) => x > 2, 3));
await checkDestroyed(someStream);
const everyStream = oneTo5Async();
await everyStream.every(common.mustCall(async (x) => x < 3, 3));
await checkDestroyed(everyStream);
const findStream = oneTo5Async();
await findStream.find(common.mustCall(async (x) => x > 1, 2));
await checkDestroyed(findStream);
// When short circuit isn't possible the whole stream is iterated
await oneTo5Async().some(common.mustCall(async () => false, 5));
await oneTo5Async().every(common.mustCall(async () => true, 5));
await oneTo5Async().find(common.mustCall(async () => false, 5));
}
}
{
// Concurrency doesn't affect which value is found.
const found = await Readable.from([1, 2]).find(async (val) => {
if (val === 1) {
await setTimeout(100);
}
return true;
}, { concurrency: 2 });
assert.strictEqual(found, 1);
}
{
// Support for AbortSignal
for (const op of ['some', 'every', 'find']) {
{
const ac = new AbortController();
assert.rejects(Readable.from([1, 2, 3])[op](
() => new Promise(() => { }),
{ signal: ac.signal }
), {
name: 'AbortError',
}, `${op} should abort correctly with sync abort`).then(common.mustCall());
ac.abort();
}
{
// Support for pre-aborted AbortSignal
assert.rejects(Readable.from([1, 2, 3])[op](
() => new Promise(() => { }),
{ signal: AbortSignal.abort() }
), {
name: 'AbortError',
}, `${op} should abort with pre-aborted abort controller`).then(common.mustCall());
}
}
}
{
// Error cases
for (const op of ['some', 'every', 'find']) {
assert.rejects(async () => {
await Readable.from([1])[op](1);
}, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid function`).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1])[op]((x) => x, {
concurrency: 'Foo'
});
}, /ERR_OUT_OF_RANGE/, `${op} should throw for invalid concurrency`).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1])[op]((x) => x, 1);
}, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid concurrency`).then(common.mustCall());
assert.rejects(async () => {
await Readable.from([1])[op]((x) => x, {
signal: true
});
}, /ERR_INVALID_ARG_TYPE/, `${op} should throw for invalid signal`).then(common.mustCall());
}
}
{
for (const op of ['some', 'every', 'find']) {
const stream = oneTo5();
Object.defineProperty(stream, 'map', {
value: common.mustNotCall(() => {}),
});
// Check that map isn't getting called.
stream[op](() => {});
}
}