Skip to content

Commit 8ad64b8

Browse files
ronagcodebytere
authored andcommittedMar 1, 2020
stream: support passing generator functions into pipeline()
Backport-PR-URL: #31975 PR-URL: #31223 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent 4d05508 commit 8ad64b8

File tree

4 files changed

+633
-31
lines changed

4 files changed

+633
-31
lines changed
 

‎doc/api/stream.md

+43-9
Original file line numberDiff line numberDiff line change
@@ -1555,17 +1555,30 @@ const cleanup = finished(rs, (err) => {
15551555
});
15561556
```
15571557

1558-
### `stream.pipeline(...streams, callback)`
1558+
### `stream.pipeline(source, ...transforms, destination, callback)`
15591559
<!-- YAML
15601560
added: v10.0.0
1561-
-->
1562-
1563-
* `...streams` {Stream} Two or more streams to pipe between.
1561+
changes:
1562+
- version: REPLACEME
1563+
pr-url: https://github.com/nodejs/node/pull/31223
1564+
description: Add support for async generators.
1565+
-->
1566+
1567+
* `source` {Stream|Iterable|AsyncIterable|Function}
1568+
* Returns: {Iterable|AsyncIterable}
1569+
* `...transforms` {Stream|Function}
1570+
* `source` {AsyncIterable}
1571+
* Returns: {AsyncIterable}
1572+
* `destination` {Stream|Function}
1573+
* `source` {AsyncIterable}
1574+
* Returns: {AsyncIterable|Promise}
15641575
* `callback` {Function} Called when the pipeline is fully done.
15651576
* `err` {Error}
1577+
* `val` Resolved value of `Promise` returned by `destination`.
1578+
* Returns: {Stream}
15661579

1567-
A module method to pipe between streams forwarding errors and properly cleaning
1568-
up and provide a callback when the pipeline is complete.
1580+
A module method to pipe between streams and generators forwarding errors and
1581+
properly cleaning up and provide a callback when the pipeline is complete.
15691582

15701583
```js
15711584
const { pipeline } = require('stream');
@@ -1608,6 +1621,28 @@ async function run() {
16081621
run().catch(console.error);
16091622
```
16101623

1624+
The `pipeline` API also supports async generators:
1625+
1626+
```js
1627+
const pipeline = util.promisify(stream.pipeline);
1628+
const fs = require('fs').promises;
1629+
1630+
async function run() {
1631+
await pipeline(
1632+
fs.createReadStream('lowercase.txt'),
1633+
async function* (source) {
1634+
for await (const chunk of source) {
1635+
yield String(chunk).toUpperCase();
1636+
}
1637+
},
1638+
fs.createWriteStream('uppercase.txt')
1639+
);
1640+
console.log('Pipeline succeeded.');
1641+
}
1642+
1643+
run().catch(console.error);
1644+
```
1645+
16111646
`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
16121647
* `Readable` streams which have emitted `'end'` or `'close'`.
16131648
* `Writable` streams which have emitted `'finish'` or `'close'`.
@@ -2707,8 +2742,7 @@ const pipeline = util.promisify(stream.pipeline);
27072742
const writable = fs.createWriteStream('./file');
27082743

27092744
(async function() {
2710-
const readable = Readable.from(iterable);
2711-
await pipeline(readable, writable);
2745+
await pipeline(iterable, writable);
27122746
})();
27132747
```
27142748

@@ -2843,7 +2877,7 @@ contain multi-byte characters.
28432877
[`stream.cork()`]: #stream_writable_cork
28442878
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
28452879
[`stream.pipe()`]: #stream_readable_pipe_destination_options
2846-
[`stream.pipeline()`]: #stream_stream_pipeline_streams_callback
2880+
[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback
28472881
[`stream.uncork()`]: #stream_writable_uncork
28482882
[`stream.unpipe()`]: #stream_readable_unpipe_destination
28492883
[`stream.wrap()`]: #stream_readable_wrap_stream

‎lib/internal/streams/pipeline.js

+191-22
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,37 @@
55

66
const {
77
ArrayIsArray,
8+
SymbolAsyncIterator,
9+
SymbolIterator
810
} = primordials;
911

1012
let eos;
1113

1214
const { once } = require('internal/util');
1315
const {
16+
ERR_INVALID_ARG_TYPE,
17+
ERR_INVALID_RETURN_VALUE,
1418
ERR_INVALID_CALLBACK,
1519
ERR_MISSING_ARGS,
1620
ERR_STREAM_DESTROYED
1721
} = require('internal/errors').codes;
1822

23+
let EE;
24+
let PassThrough;
25+
let createReadableStreamAsyncIterator;
26+
1927
function isRequest(stream) {
2028
return stream && stream.setHeader && typeof stream.abort === 'function';
2129
}
2230

31+
function destroyStream(stream, err) {
32+
// request.destroy just do .end - .abort is what we want
33+
if (isRequest(stream)) return stream.abort();
34+
if (isRequest(stream.req)) return stream.req.abort();
35+
if (typeof stream.destroy === 'function') return stream.destroy(err);
36+
if (typeof stream.close === 'function') return stream.close();
37+
}
38+
2339
function destroyer(stream, reading, writing, callback) {
2440
callback = once(callback);
2541

@@ -41,19 +57,12 @@ function destroyer(stream, reading, writing, callback) {
4157
if (destroyed) return;
4258
destroyed = true;
4359

44-
// request.destroy just do .end - .abort is what we want
45-
if (isRequest(stream)) return stream.abort();
46-
if (isRequest(stream.req)) return stream.req.abort();
47-
if (typeof stream.destroy === 'function') return stream.destroy(err);
60+
destroyStream(stream, err);
4861

4962
callback(err || new ERR_STREAM_DESTROYED('pipe'));
5063
};
5164
}
5265

53-
function pipe(from, to) {
54-
return from.pipe(to);
55-
}
56-
5766
function popCallback(streams) {
5867
// Streams should never be an empty array. It should always contain at least
5968
// a single stream. Therefore optimize for the average case instead of
@@ -63,8 +72,89 @@ function popCallback(streams) {
6372
return streams.pop();
6473
}
6574

75+
function isPromise(obj) {
76+
return !!(obj && typeof obj.then === 'function');
77+
}
78+
79+
function isReadable(obj) {
80+
return !!(obj && typeof obj.pipe === 'function');
81+
}
82+
83+
function isWritable(obj) {
84+
return !!(obj && typeof obj.write === 'function');
85+
}
86+
87+
function isStream(obj) {
88+
return isReadable(obj) || isWritable(obj);
89+
}
90+
91+
function isIterable(obj, isAsync) {
92+
if (!obj) return false;
93+
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
94+
if (isAsync === false) return typeof obj[SymbolIterator] === 'function';
95+
return typeof obj[SymbolAsyncIterator] === 'function' ||
96+
typeof obj[SymbolIterator] === 'function';
97+
}
98+
99+
function makeAsyncIterable(val) {
100+
if (isIterable(val)) {
101+
return val;
102+
} else if (isReadable(val)) {
103+
// Legacy streams are not Iterable.
104+
return _fromReadable(val);
105+
} else {
106+
throw new ERR_INVALID_ARG_TYPE(
107+
'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
108+
}
109+
}
110+
111+
async function* _fromReadable(val) {
112+
if (!createReadableStreamAsyncIterator) {
113+
createReadableStreamAsyncIterator =
114+
require('internal/streams/async_iterator');
115+
}
116+
117+
try {
118+
if (typeof val.read !== 'function') {
119+
// createReadableStreamAsyncIterator does not support
120+
// v1 streams. Convert it into a v2 stream.
121+
122+
if (!PassThrough) {
123+
PassThrough = require('_stream_passthrough');
124+
}
125+
126+
const pt = new PassThrough();
127+
val
128+
.on('error', (err) => pt.destroy(err))
129+
.pipe(pt);
130+
yield* createReadableStreamAsyncIterator(pt);
131+
} else {
132+
yield* createReadableStreamAsyncIterator(val);
133+
}
134+
} finally {
135+
destroyStream(val);
136+
}
137+
}
138+
139+
async function pump(iterable, writable, finish) {
140+
if (!EE) {
141+
EE = require('events');
142+
}
143+
try {
144+
for await (const chunk of iterable) {
145+
if (!writable.write(chunk)) {
146+
if (writable.destroyed) return;
147+
await EE.once(writable, 'drain');
148+
}
149+
}
150+
writable.end();
151+
} catch (err) {
152+
finish(err);
153+
}
154+
}
155+
66156
function pipeline(...streams) {
67-
const callback = popCallback(streams);
157+
const callback = once(popCallback(streams));
68158

69159
if (ArrayIsArray(streams[0])) streams = streams[0];
70160

@@ -73,25 +163,104 @@ function pipeline(...streams) {
73163
}
74164

75165
let error;
76-
const destroys = streams.map(function(stream, i) {
166+
const destroys = [];
167+
168+
function finish(err, val, final) {
169+
if (!error && err) {
170+
error = err;
171+
}
172+
173+
if (error || final) {
174+
for (const destroy of destroys) {
175+
destroy(error);
176+
}
177+
}
178+
179+
if (final) {
180+
callback(error, val);
181+
}
182+
}
183+
184+
function wrap(stream, reading, writing, final) {
185+
destroys.push(destroyer(stream, reading, writing, (err) => {
186+
finish(err, null, final);
187+
}));
188+
}
189+
190+
let ret;
191+
for (let i = 0; i < streams.length; i++) {
192+
const stream = streams[i];
77193
const reading = i < streams.length - 1;
78194
const writing = i > 0;
79-
return destroyer(stream, reading, writing, function(err) {
80-
if (!error) error = err;
81-
if (err) {
82-
for (const destroy of destroys) {
83-
destroy(err);
195+
196+
if (isStream(stream)) {
197+
wrap(stream, reading, writing, !reading);
198+
}
199+
200+
if (i === 0) {
201+
if (typeof stream === 'function') {
202+
ret = stream();
203+
if (!isIterable(ret)) {
204+
throw new ERR_INVALID_RETURN_VALUE(
205+
'Iterable, AsyncIterable or Stream', 'source', ret);
84206
}
207+
} else if (isIterable(stream) || isReadable(stream)) {
208+
ret = stream;
209+
} else {
210+
throw new ERR_INVALID_ARG_TYPE(
211+
'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'],
212+
stream);
85213
}
86-
if (reading) return;
87-
for (const destroy of destroys) {
88-
destroy();
214+
} else if (typeof stream === 'function') {
215+
ret = makeAsyncIterable(ret);
216+
ret = stream(ret);
217+
218+
if (reading) {
219+
if (!isIterable(ret, true)) {
220+
throw new ERR_INVALID_RETURN_VALUE(
221+
'AsyncIterable', `transform[${i - 1}]`, ret);
222+
}
223+
} else {
224+
if (!PassThrough) {
225+
PassThrough = require('_stream_passthrough');
226+
}
227+
228+
const pt = new PassThrough();
229+
if (isPromise(ret)) {
230+
ret
231+
.then((val) => {
232+
pt.end(val);
233+
finish(null, val, true);
234+
})
235+
.catch((err) => {
236+
finish(err, null, true);
237+
});
238+
} else if (isIterable(ret, true)) {
239+
pump(ret, pt, finish);
240+
} else {
241+
throw new ERR_INVALID_RETURN_VALUE(
242+
'AsyncIterable or Promise', 'destination', ret);
243+
}
244+
245+
ret = pt;
246+
wrap(ret, true, false, true);
89247
}
90-
callback(error);
91-
});
92-
});
248+
} else if (isStream(stream)) {
249+
if (isReadable(ret)) {
250+
ret.pipe(stream);
251+
} else {
252+
ret = makeAsyncIterable(ret);
253+
pump(ret, stream, finish);
254+
}
255+
ret = stream;
256+
} else {
257+
const name = reading ? `transform[${i - 1}]` : 'destination';
258+
throw new ERR_INVALID_ARG_TYPE(
259+
name, ['Stream', 'Function'], ret);
260+
}
261+
}
93262

94-
return streams.reduce(pipe);
263+
return ret;
95264
}
96265

97266
module.exports = pipeline;

‎test/parallel/test-stream-pipeline.js

+397
Original file line numberDiff line numberDiff line change
@@ -516,3 +516,400 @@ const { promisify } = require('util');
516516
}).on('error', common.mustNotCall());
517517
});
518518
}
519+
520+
{
521+
let res = '';
522+
const w = new Writable({
523+
write(chunk, encoding, callback) {
524+
res += chunk;
525+
callback();
526+
}
527+
});
528+
pipeline(function*() {
529+
yield 'hello';
530+
yield 'world';
531+
}(), w, common.mustCall((err) => {
532+
assert.ok(!err);
533+
assert.strictEqual(res, 'helloworld');
534+
}));
535+
}
536+
537+
{
538+
let res = '';
539+
const w = new Writable({
540+
write(chunk, encoding, callback) {
541+
res += chunk;
542+
callback();
543+
}
544+
});
545+
pipeline(async function*() {
546+
await Promise.resolve();
547+
yield 'hello';
548+
yield 'world';
549+
}(), w, common.mustCall((err) => {
550+
assert.ok(!err);
551+
assert.strictEqual(res, 'helloworld');
552+
}));
553+
}
554+
555+
{
556+
let res = '';
557+
const w = new Writable({
558+
write(chunk, encoding, callback) {
559+
res += chunk;
560+
callback();
561+
}
562+
});
563+
pipeline(function*() {
564+
yield 'hello';
565+
yield 'world';
566+
}, w, common.mustCall((err) => {
567+
assert.ok(!err);
568+
assert.strictEqual(res, 'helloworld');
569+
}));
570+
}
571+
572+
{
573+
let res = '';
574+
const w = new Writable({
575+
write(chunk, encoding, callback) {
576+
res += chunk;
577+
callback();
578+
}
579+
});
580+
pipeline(async function*() {
581+
await Promise.resolve();
582+
yield 'hello';
583+
yield 'world';
584+
}, w, common.mustCall((err) => {
585+
assert.ok(!err);
586+
assert.strictEqual(res, 'helloworld');
587+
}));
588+
}
589+
590+
{
591+
let res = '';
592+
pipeline(async function*() {
593+
await Promise.resolve();
594+
yield 'hello';
595+
yield 'world';
596+
}, async function*(source) {
597+
for await (const chunk of source) {
598+
yield chunk.toUpperCase();
599+
}
600+
}, async function(source) {
601+
for await (const chunk of source) {
602+
res += chunk;
603+
}
604+
}, common.mustCall((err) => {
605+
assert.ok(!err);
606+
assert.strictEqual(res, 'HELLOWORLD');
607+
}));
608+
}
609+
610+
{
611+
pipeline(async function*() {
612+
await Promise.resolve();
613+
yield 'hello';
614+
yield 'world';
615+
}, async function*(source) {
616+
const ret = [];
617+
for await (const chunk of source) {
618+
ret.push(chunk.toUpperCase());
619+
}
620+
yield ret;
621+
}, async function(source) {
622+
let ret = '';
623+
for await (const chunk of source) {
624+
ret += chunk;
625+
}
626+
return ret;
627+
}, common.mustCall((err, val) => {
628+
assert.ok(!err);
629+
assert.strictEqual(val, 'HELLOWORLD');
630+
}));
631+
}
632+
633+
{
634+
// AsyncIterable destination is returned and finalizes.
635+
636+
const ret = pipeline(async function*() {
637+
await Promise.resolve();
638+
yield 'hello';
639+
}, async function*(source) {
640+
for await (const chunk of source) {
641+
chunk;
642+
}
643+
}, common.mustCall((err) => {
644+
assert.strictEqual(err, undefined);
645+
}));
646+
ret.resume();
647+
assert.strictEqual(typeof ret.pipe, 'function');
648+
}
649+
650+
{
651+
// AsyncFunction destination is not returned and error is
652+
// propagated.
653+
654+
const ret = pipeline(async function*() {
655+
await Promise.resolve();
656+
throw new Error('kaboom');
657+
}, async function*(source) {
658+
for await (const chunk of source) {
659+
chunk;
660+
}
661+
}, common.mustCall((err) => {
662+
assert.strictEqual(err.message, 'kaboom');
663+
}));
664+
ret.resume();
665+
assert.strictEqual(typeof ret.pipe, 'function');
666+
}
667+
668+
{
669+
const s = new PassThrough();
670+
pipeline(async function*() {
671+
throw new Error('kaboom');
672+
}, s, common.mustCall((err) => {
673+
assert.strictEqual(err.message, 'kaboom');
674+
assert.strictEqual(s.destroyed, true);
675+
}));
676+
}
677+
678+
{
679+
const s = new PassThrough();
680+
pipeline(async function*() {
681+
throw new Error('kaboom');
682+
}(), s, common.mustCall((err) => {
683+
assert.strictEqual(err.message, 'kaboom');
684+
assert.strictEqual(s.destroyed, true);
685+
}));
686+
}
687+
688+
{
689+
const s = new PassThrough();
690+
pipeline(function*() {
691+
throw new Error('kaboom');
692+
}, s, common.mustCall((err, val) => {
693+
assert.strictEqual(err.message, 'kaboom');
694+
assert.strictEqual(s.destroyed, true);
695+
}));
696+
}
697+
698+
{
699+
const s = new PassThrough();
700+
pipeline(function*() {
701+
throw new Error('kaboom');
702+
}(), s, common.mustCall((err, val) => {
703+
assert.strictEqual(err.message, 'kaboom');
704+
assert.strictEqual(s.destroyed, true);
705+
}));
706+
}
707+
708+
{
709+
const s = new PassThrough();
710+
pipeline(async function*() {
711+
await Promise.resolve();
712+
yield 'hello';
713+
yield 'world';
714+
}, s, async function(source) {
715+
for await (const chunk of source) {
716+
chunk;
717+
throw new Error('kaboom');
718+
}
719+
}, common.mustCall((err, val) => {
720+
assert.strictEqual(err.message, 'kaboom');
721+
assert.strictEqual(s.destroyed, true);
722+
}));
723+
}
724+
725+
{
726+
const s = new PassThrough();
727+
const ret = pipeline(function() {
728+
return ['hello', 'world'];
729+
}, s, async function*(source) {
730+
for await (const chunk of source) {
731+
chunk;
732+
throw new Error('kaboom');
733+
}
734+
}, common.mustCall((err) => {
735+
assert.strictEqual(err.message, 'kaboom');
736+
assert.strictEqual(s.destroyed, true);
737+
}));
738+
ret.resume();
739+
assert.strictEqual(typeof ret.pipe, 'function');
740+
}
741+
742+
{
743+
// Legacy streams without async iterator.
744+
745+
const s = new PassThrough();
746+
s.push('asd');
747+
s.push(null);
748+
s[Symbol.asyncIterator] = null;
749+
let ret = '';
750+
pipeline(s, async function(source) {
751+
for await (const chunk of source) {
752+
ret += chunk;
753+
}
754+
}, common.mustCall((err) => {
755+
assert.strictEqual(err, undefined);
756+
assert.strictEqual(ret, 'asd');
757+
assert.strictEqual(s.destroyed, true);
758+
}));
759+
}
760+
761+
{
762+
// v1 streams without read().
763+
764+
const s = new Stream();
765+
process.nextTick(() => {
766+
s.emit('data', 'asd');
767+
s.emit('end');
768+
});
769+
s.close = common.mustCall();
770+
let ret = '';
771+
pipeline(s, async function(source) {
772+
for await (const chunk of source) {
773+
ret += chunk;
774+
}
775+
}, common.mustCall((err) => {
776+
assert.strictEqual(err, undefined);
777+
assert.strictEqual(ret, 'asd');
778+
assert.strictEqual(s.destroyed, true);
779+
}));
780+
}
781+
782+
{
783+
// v1 error streams without read().
784+
785+
const s = new Stream();
786+
process.nextTick(() => {
787+
s.emit('error', new Error('kaboom'));
788+
});
789+
s.destroy = common.mustCall();
790+
pipeline(s, async function(source) {
791+
}, common.mustCall((err) => {
792+
assert.strictEqual(err.message, 'kaboom');
793+
}));
794+
}
795+
796+
{
797+
const s = new PassThrough();
798+
assert.throws(() => {
799+
pipeline(function(source) {
800+
}, s, () => {});
801+
}, (err) => {
802+
assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
803+
assert.strictEqual(s.destroyed, false);
804+
return true;
805+
});
806+
}
807+
808+
{
809+
const s = new PassThrough();
810+
assert.throws(() => {
811+
pipeline(s, function(source) {
812+
}, s, () => {});
813+
}, (err) => {
814+
assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
815+
assert.strictEqual(s.destroyed, false);
816+
return true;
817+
});
818+
}
819+
820+
{
821+
const s = new PassThrough();
822+
assert.throws(() => {
823+
pipeline(s, function(source) {
824+
}, () => {});
825+
}, (err) => {
826+
assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
827+
assert.strictEqual(s.destroyed, false);
828+
return true;
829+
});
830+
}
831+
832+
{
833+
const s = new PassThrough();
834+
assert.throws(() => {
835+
pipeline(s, function*(source) {
836+
}, () => {});
837+
}, (err) => {
838+
assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
839+
assert.strictEqual(s.destroyed, false);
840+
return true;
841+
});
842+
}
843+
844+
{
845+
let res = '';
846+
pipeline(async function*() {
847+
await Promise.resolve();
848+
yield 'hello';
849+
yield 'world';
850+
}, new Transform({
851+
transform(chunk, encoding, cb) {
852+
cb(new Error('kaboom'));
853+
}
854+
}), async function(source) {
855+
for await (const chunk of source) {
856+
res += chunk;
857+
}
858+
}, common.mustCall((err) => {
859+
assert.strictEqual(err.message, 'kaboom');
860+
assert.strictEqual(res, '');
861+
}));
862+
}
863+
864+
{
865+
let res = '';
866+
pipeline(async function*() {
867+
await Promise.resolve();
868+
yield 'hello';
869+
yield 'world';
870+
}, new Transform({
871+
transform(chunk, encoding, cb) {
872+
process.nextTick(cb, new Error('kaboom'));
873+
}
874+
}), async function(source) {
875+
for await (const chunk of source) {
876+
res += chunk;
877+
}
878+
}, common.mustCall((err) => {
879+
assert.strictEqual(err.message, 'kaboom');
880+
assert.strictEqual(res, '');
881+
}));
882+
}
883+
884+
{
885+
let res = '';
886+
pipeline(async function*() {
887+
await Promise.resolve();
888+
yield 'hello';
889+
yield 'world';
890+
}, new Transform({
891+
decodeStrings: false,
892+
transform(chunk, encoding, cb) {
893+
cb(null, chunk.toUpperCase());
894+
}
895+
}), async function(source) {
896+
for await (const chunk of source) {
897+
res += chunk;
898+
}
899+
}, common.mustCall((err) => {
900+
assert.ok(!err);
901+
assert.strictEqual(res, 'HELLOWORLD');
902+
}));
903+
}
904+
905+
{
906+
// Ensure no unhandled rejection from async function.
907+
908+
pipeline(async function*() {
909+
yield 'hello';
910+
}, async function(source) {
911+
throw new Error('kaboom');
912+
}, common.mustCall((err) => {
913+
assert.strictEqual(err.message, 'kaboom');
914+
}));
915+
}

‎tools/doc/type-parser.js

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ const customTypesMap = {
2828

2929
'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface',
3030

31+
'AsyncIterable': 'https://tc39.github.io/ecma262/#sec-asynciterable-interface',
32+
3133
'bigint': `${jsDocPrefix}Reference/Global_Objects/BigInt`,
3234

3335
'Iterable':

0 commit comments

Comments
 (0)
Please sign in to comment.