Skip to content

Commit fb14acb

Browse files
mcollinaBethGriggs
authored andcommittedDec 2, 2020
stream: move to internal/streams
Move all the streams constructors to internal/streams and avoid a circular dependencies between the modules. See: nodejs/readable-stream#348 PR-URL: #35239 Backport-PR-URL: #35349 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Daijiro Wachi <daijiro.wachi@gmail.com>
1 parent d58a466 commit fb14acb

14 files changed

+2568
-2536
lines changed
 

‎lib/_stream_duplex.js

+2-104
Original file line numberDiff line numberDiff line change
@@ -1,108 +1,6 @@
1-
// Copyright Joyent, Inc. and other Node contributors.
2-
//
3-
// Permission is hereby granted, free of charge, to any person obtaining a
4-
// copy of this software and associated documentation files (the
5-
// "Software"), to deal in the Software without restriction, including
6-
// without limitation the rights to use, copy, modify, merge, publish,
7-
// distribute, sublicense, and/or sell copies of the Software, and to permit
8-
// persons to whom the Software is furnished to do so, subject to the
9-
// following conditions:
10-
//
11-
// The above copyright notice and this permission notice shall be included
12-
// in all copies or substantial portions of the Software.
13-
//
14-
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15-
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16-
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17-
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18-
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19-
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20-
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21-
22-
// a duplex stream is just a stream that is both readable and writable.
23-
// Since JS doesn't have multiple prototype inheritance, this class
24-
// prototypically inherits from Readable, and then parasitically from
25-
// Writable.
26-
271
'use strict';
282

29-
const {
30-
ObjectDefineProperties,
31-
ObjectGetOwnPropertyDescriptor,
32-
ObjectKeys,
33-
ObjectSetPrototypeOf,
34-
} = primordials;
3+
// TODO(mcollina): deprecate this file
354

5+
const Duplex = require('internal/streams/duplex');
366
module.exports = Duplex;
37-
38-
const Readable = require('_stream_readable');
39-
const Writable = require('_stream_writable');
40-
41-
ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
42-
ObjectSetPrototypeOf(Duplex, Readable);
43-
44-
{
45-
// Allow the keys array to be GC'ed.
46-
for (const method of ObjectKeys(Writable.prototype)) {
47-
if (!Duplex.prototype[method])
48-
Duplex.prototype[method] = Writable.prototype[method];
49-
}
50-
}
51-
52-
function Duplex(options) {
53-
if (!(this instanceof Duplex))
54-
return new Duplex(options);
55-
56-
Readable.call(this, options);
57-
Writable.call(this, options);
58-
this.allowHalfOpen = true;
59-
60-
if (options) {
61-
if (options.readable === false)
62-
this.readable = false;
63-
64-
if (options.writable === false)
65-
this.writable = false;
66-
67-
if (options.allowHalfOpen === false) {
68-
this.allowHalfOpen = false;
69-
}
70-
}
71-
}
72-
73-
ObjectDefineProperties(Duplex.prototype, {
74-
writable:
75-
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writable'),
76-
writableHighWaterMark:
77-
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableHighWaterMark'),
78-
writableObjectMode:
79-
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableObjectMode'),
80-
writableBuffer:
81-
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableBuffer'),
82-
writableLength:
83-
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableLength'),
84-
writableFinished:
85-
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableFinished'),
86-
writableCorked:
87-
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'),
88-
writableEnded:
89-
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'),
90-
91-
destroyed: {
92-
get() {
93-
if (this._readableState === undefined ||
94-
this._writableState === undefined) {
95-
return false;
96-
}
97-
return this._readableState.destroyed && this._writableState.destroyed;
98-
},
99-
set(value) {
100-
// Backward compatibility, the user is explicitly
101-
// managing destroyed.
102-
if (this._readableState && this._writableState) {
103-
this._readableState.destroyed = value;
104-
this._writableState.destroyed = value;
105-
}
106-
}
107-
}
108-
});

‎lib/_stream_passthrough.js

+2-43
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,6 @@
1-
// Copyright Joyent, Inc. and other Node contributors.
2-
//
3-
// Permission is hereby granted, free of charge, to any person obtaining a
4-
// copy of this software and associated documentation files (the
5-
// "Software"), to deal in the Software without restriction, including
6-
// without limitation the rights to use, copy, modify, merge, publish,
7-
// distribute, sublicense, and/or sell copies of the Software, and to permit
8-
// persons to whom the Software is furnished to do so, subject to the
9-
// following conditions:
10-
//
11-
// The above copyright notice and this permission notice shall be included
12-
// in all copies or substantial portions of the Software.
13-
//
14-
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15-
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16-
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17-
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18-
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19-
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20-
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21-
22-
// a passthrough stream.
23-
// basically just the most minimal sort of Transform stream.
24-
// Every written chunk gets output as-is.
25-
261
'use strict';
272

28-
const {
29-
ObjectSetPrototypeOf,
30-
} = primordials;
3+
// TODO(mcollina): deprecate this file
314

5+
const PassThrough = require('internal/streams/passthrough');
326
module.exports = PassThrough;
33-
34-
const Transform = require('_stream_transform');
35-
ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype);
36-
ObjectSetPrototypeOf(PassThrough, Transform);
37-
38-
function PassThrough(options) {
39-
if (!(this instanceof PassThrough))
40-
return new PassThrough(options);
41-
42-
Transform.call(this, options);
43-
}
44-
45-
PassThrough.prototype._transform = function(chunk, encoding, cb) {
46-
cb(null, chunk);
47-
};

‎lib/_stream_readable.js

+2-1,359
Large diffs are not rendered by default.

‎lib/_stream_transform.js

+2-231
Original file line numberDiff line numberDiff line change
@@ -1,235 +1,6 @@
1-
// Copyright Joyent, Inc. and other Node contributors.
2-
//
3-
// Permission is hereby granted, free of charge, to any person obtaining a
4-
// copy of this software and associated documentation files (the
5-
// "Software"), to deal in the Software without restriction, including
6-
// without limitation the rights to use, copy, modify, merge, publish,
7-
// distribute, sublicense, and/or sell copies of the Software, and to permit
8-
// persons to whom the Software is furnished to do so, subject to the
9-
// following conditions:
10-
//
11-
// The above copyright notice and this permission notice shall be included
12-
// in all copies or substantial portions of the Software.
13-
//
14-
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15-
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16-
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17-
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18-
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19-
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20-
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21-
22-
// a transform stream is a readable/writable stream where you do
23-
// something with the data. Sometimes it's called a "filter",
24-
// but that's not a great name for it, since that implies a thing where
25-
// some bits pass through, and others are simply ignored. (That would
26-
// be a valid example of a transform, of course.)
27-
//
28-
// While the output is causally related to the input, it's not a
29-
// necessarily symmetric or synchronous transformation. For example,
30-
// a zlib stream might take multiple plain-text writes(), and then
31-
// emit a single compressed chunk some time in the future.
32-
//
33-
// Here's how this works:
34-
//
35-
// The Transform stream has all the aspects of the readable and writable
36-
// stream classes. When you write(chunk), that calls _write(chunk,cb)
37-
// internally, and returns false if there's a lot of pending writes
38-
// buffered up. When you call read(), that calls _read(n) until
39-
// there's enough pending readable data buffered up.
40-
//
41-
// In a transform stream, the written data is placed in a buffer. When
42-
// _read(n) is called, it transforms the queued up data, calling the
43-
// buffered _write cb's as it consumes chunks. If consuming a single
44-
// written chunk would result in multiple output chunks, then the first
45-
// outputted bit calls the readcb, and subsequent chunks just go into
46-
// the read buffer, and will cause it to emit 'readable' if necessary.
47-
//
48-
// This way, back-pressure is actually determined by the reading side,
49-
// since _read has to be called to start processing a new chunk. However,
50-
// a pathological inflate type of transform can cause excessive buffering
51-
// here. For example, imagine a stream where every byte of input is
52-
// interpreted as an integer from 0-255, and then results in that many
53-
// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in
54-
// 1kb of data being output. In this case, you could write a very small
55-
// amount of input, and end up with a very large amount of output. In
56-
// such a pathological inflating mechanism, there'd be no way to tell
57-
// the system to stop doing the transform. A single 4MB write could
58-
// cause the system to run out of memory.
59-
//
60-
// However, even in such a pathological case, only a single written chunk
61-
// would be consumed, and then the rest would wait (un-transformed) until
62-
// the results of the previous transformed chunk were consumed.
63-
641
'use strict';
652

66-
const {
67-
ObjectDefineProperty,
68-
ObjectSetPrototypeOf,
69-
Symbol
70-
} = primordials;
3+
// TODO(mcollina): deprecate this file
714

5+
const Transform = require('internal/streams/transform');
726
module.exports = Transform;
73-
const {
74-
ERR_METHOD_NOT_IMPLEMENTED,
75-
ERR_MULTIPLE_CALLBACK,
76-
ERR_TRANSFORM_ALREADY_TRANSFORMING,
77-
ERR_TRANSFORM_WITH_LENGTH_0
78-
} = require('internal/errors').codes;
79-
const Duplex = require('_stream_duplex');
80-
const internalUtil = require('internal/util');
81-
82-
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
83-
ObjectSetPrototypeOf(Transform, Duplex);
84-
85-
const kTransformState = Symbol('kTransformState');
86-
87-
function afterTransform(er, data) {
88-
const ts = this[kTransformState];
89-
ts.transforming = false;
90-
91-
const cb = ts.writecb;
92-
93-
if (cb === null) {
94-
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
95-
}
96-
97-
ts.writechunk = null;
98-
ts.writecb = null;
99-
100-
if (data != null) // Single equals check for both `null` and `undefined`
101-
this.push(data);
102-
103-
cb(er);
104-
105-
const rs = this._readableState;
106-
rs.reading = false;
107-
if (rs.needReadable || rs.length < rs.highWaterMark) {
108-
this._read(rs.highWaterMark);
109-
}
110-
}
111-
112-
113-
function Transform(options) {
114-
if (!(this instanceof Transform))
115-
return new Transform(options);
116-
117-
Duplex.call(this, options);
118-
119-
this[kTransformState] = {
120-
afterTransform: afterTransform.bind(this),
121-
needTransform: false,
122-
transforming: false,
123-
writecb: null,
124-
writechunk: null,
125-
writeencoding: null
126-
};
127-
128-
// We have implemented the _read method, and done the other things
129-
// that Readable wants before the first _read call, so unset the
130-
// sync guard flag.
131-
this._readableState.sync = false;
132-
133-
if (options) {
134-
if (typeof options.transform === 'function')
135-
this._transform = options.transform;
136-
137-
if (typeof options.flush === 'function')
138-
this._flush = options.flush;
139-
}
140-
141-
// When the writable side finishes, then flush out anything remaining.
142-
this.on('prefinish', prefinish);
143-
}
144-
145-
function prefinish() {
146-
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
147-
this._flush((er, data) => {
148-
done(this, er, data);
149-
});
150-
} else {
151-
done(this, null, null);
152-
}
153-
}
154-
155-
ObjectDefineProperty(Transform.prototype, '_transformState', {
156-
get: internalUtil.deprecate(function() {
157-
return this[kTransformState];
158-
}, 'Transform.prototype._transformState is deprecated', 'DEP0143'),
159-
set: internalUtil.deprecate(function(val) {
160-
this[kTransformState] = val;
161-
}, 'Transform.prototype._transformState is deprecated', 'DEP0143')
162-
});
163-
164-
Transform.prototype.push = function(chunk, encoding) {
165-
this[kTransformState].needTransform = false;
166-
return Duplex.prototype.push.call(this, chunk, encoding);
167-
};
168-
169-
// This is the part where you do stuff!
170-
// override this function in implementation classes.
171-
// 'chunk' is an input chunk.
172-
//
173-
// Call `push(newChunk)` to pass along transformed output
174-
// to the readable side. You may call 'push' zero or more times.
175-
//
176-
// Call `cb(err)` when you are done with this chunk. If you pass
177-
// an error, then that'll put the hurt on the whole operation. If you
178-
// never call cb(), then you'll never get another chunk.
179-
Transform.prototype._transform = function(chunk, encoding, cb) {
180-
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()');
181-
};
182-
183-
Transform.prototype._write = function(chunk, encoding, cb) {
184-
const ts = this[kTransformState];
185-
ts.writecb = cb;
186-
ts.writechunk = chunk;
187-
ts.writeencoding = encoding;
188-
if (!ts.transforming) {
189-
const rs = this._readableState;
190-
if (ts.needTransform ||
191-
rs.needReadable ||
192-
rs.length < rs.highWaterMark)
193-
this._read(rs.highWaterMark);
194-
}
195-
};
196-
197-
// Doesn't matter what the args are here.
198-
// _transform does all the work.
199-
// That we got here means that the readable side wants more data.
200-
Transform.prototype._read = function(n) {
201-
const ts = this[kTransformState];
202-
203-
if (ts.writechunk !== null && !ts.transforming) {
204-
ts.transforming = true;
205-
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
206-
} else {
207-
// Mark that we need a transform, so that any data that comes in
208-
// will get processed, now that we've asked for it.
209-
ts.needTransform = true;
210-
}
211-
};
212-
213-
214-
Transform.prototype._destroy = function(err, cb) {
215-
Duplex.prototype._destroy.call(this, err, (err2) => {
216-
cb(err2);
217-
});
218-
};
219-
220-
221-
function done(stream, er, data) {
222-
if (er)
223-
return stream.emit('error', er);
224-
225-
if (data != null) // Single equals check for both `null` and `undefined`
226-
stream.push(data);
227-
228-
// These two error cases are coherence checks that can likely not be tested.
229-
if (stream._writableState.length)
230-
throw new ERR_TRANSFORM_WITH_LENGTH_0();
231-
232-
if (stream[kTransformState].transforming)
233-
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
234-
return stream.push(null);
235-
}

‎lib/_stream_writable.js

+2-783
Large diffs are not rendered by default.

‎lib/internal/streams/duplex.js

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright Joyent, Inc. and other Node contributors.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the
5+
// "Software"), to deal in the Software without restriction, including
6+
// without limitation the rights to use, copy, modify, merge, publish,
7+
// distribute, sublicense, and/or sell copies of the Software, and to permit
8+
// persons to whom the Software is furnished to do so, subject to the
9+
// following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included
12+
// in all copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16+
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17+
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18+
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19+
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20+
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21+
22+
// a duplex stream is just a stream that is both readable and writable.
23+
// Since JS doesn't have multiple prototype inheritance, this class
24+
// prototypically inherits from Readable, and then parasitically from
25+
// Writable.
26+
27+
'use strict';
28+
29+
const {
30+
ObjectDefineProperties,
31+
ObjectGetOwnPropertyDescriptor,
32+
ObjectKeys,
33+
ObjectSetPrototypeOf,
34+
} = primordials;
35+
36+
module.exports = Duplex;
37+
38+
const Readable = require('internal/streams/readable');
39+
const Writable = require('internal/streams/writable');
40+
41+
ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
42+
ObjectSetPrototypeOf(Duplex, Readable);
43+
44+
{
45+
// Allow the keys array to be GC'ed.
46+
for (const method of ObjectKeys(Writable.prototype)) {
47+
if (!Duplex.prototype[method])
48+
Duplex.prototype[method] = Writable.prototype[method];
49+
}
50+
}
51+
52+
function Duplex(options) {
53+
if (!(this instanceof Duplex))
54+
return new Duplex(options);
55+
56+
Readable.call(this, options);
57+
Writable.call(this, options);
58+
this.allowHalfOpen = true;
59+
60+
if (options) {
61+
if (options.readable === false)
62+
this.readable = false;
63+
64+
if (options.writable === false)
65+
this.writable = false;
66+
67+
if (options.allowHalfOpen === false) {
68+
this.allowHalfOpen = false;
69+
}
70+
}
71+
}
72+
73+
ObjectDefineProperties(Duplex.prototype, {
74+
writable:
75+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writable'),
76+
writableHighWaterMark:
77+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableHighWaterMark'),
78+
writableObjectMode:
79+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableObjectMode'),
80+
writableBuffer:
81+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableBuffer'),
82+
writableLength:
83+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableLength'),
84+
writableFinished:
85+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableFinished'),
86+
writableCorked:
87+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'),
88+
writableEnded:
89+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'),
90+
91+
destroyed: {
92+
get() {
93+
if (this._readableState === undefined ||
94+
this._writableState === undefined) {
95+
return false;
96+
}
97+
return this._readableState.destroyed && this._writableState.destroyed;
98+
},
99+
set(value) {
100+
// Backward compatibility, the user is explicitly
101+
// managing destroyed.
102+
if (this._readableState && this._writableState) {
103+
this._readableState.destroyed = value;
104+
this._writableState.destroyed = value;
105+
}
106+
}
107+
}
108+
});

‎lib/internal/streams/passthrough.js

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright Joyent, Inc. and other Node contributors.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the
5+
// "Software"), to deal in the Software without restriction, including
6+
// without limitation the rights to use, copy, modify, merge, publish,
7+
// distribute, sublicense, and/or sell copies of the Software, and to permit
8+
// persons to whom the Software is furnished to do so, subject to the
9+
// following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included
12+
// in all copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16+
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17+
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18+
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19+
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20+
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21+
22+
// a passthrough stream.
23+
// basically just the most minimal sort of Transform stream.
24+
// Every written chunk gets output as-is.
25+
26+
'use strict';
27+
28+
const {
29+
ObjectSetPrototypeOf,
30+
} = primordials;
31+
32+
module.exports = PassThrough;
33+
34+
const Transform = require('internal/streams/transform');
35+
ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype);
36+
ObjectSetPrototypeOf(PassThrough, Transform);
37+
38+
function PassThrough(options) {
39+
if (!(this instanceof PassThrough))
40+
return new PassThrough(options);
41+
42+
Transform.call(this, options);
43+
}
44+
45+
PassThrough.prototype._transform = function(chunk, encoding, cb) {
46+
cb(null, chunk);
47+
};

‎lib/internal/streams/readable.js

+1,362
Large diffs are not rendered by default.

‎lib/internal/streams/transform.js

+235
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
// Copyright Joyent, Inc. and other Node contributors.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the
5+
// "Software"), to deal in the Software without restriction, including
6+
// without limitation the rights to use, copy, modify, merge, publish,
7+
// distribute, sublicense, and/or sell copies of the Software, and to permit
8+
// persons to whom the Software is furnished to do so, subject to the
9+
// following conditions:
10+
//
11+
// The above copyright notice and this permission notice shall be included
12+
// in all copies or substantial portions of the Software.
13+
//
14+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16+
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17+
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18+
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19+
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20+
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21+
22+
// a transform stream is a readable/writable stream where you do
23+
// something with the data. Sometimes it's called a "filter",
24+
// but that's not a great name for it, since that implies a thing where
25+
// some bits pass through, and others are simply ignored. (That would
26+
// be a valid example of a transform, of course.)
27+
//
28+
// While the output is causally related to the input, it's not a
29+
// necessarily symmetric or synchronous transformation. For example,
30+
// a zlib stream might take multiple plain-text writes(), and then
31+
// emit a single compressed chunk some time in the future.
32+
//
33+
// Here's how this works:
34+
//
35+
// The Transform stream has all the aspects of the readable and writable
36+
// stream classes. When you write(chunk), that calls _write(chunk,cb)
37+
// internally, and returns false if there's a lot of pending writes
38+
// buffered up. When you call read(), that calls _read(n) until
39+
// there's enough pending readable data buffered up.
40+
//
41+
// In a transform stream, the written data is placed in a buffer. When
42+
// _read(n) is called, it transforms the queued up data, calling the
43+
// buffered _write cb's as it consumes chunks. If consuming a single
44+
// written chunk would result in multiple output chunks, then the first
45+
// outputted bit calls the readcb, and subsequent chunks just go into
46+
// the read buffer, and will cause it to emit 'readable' if necessary.
47+
//
48+
// This way, back-pressure is actually determined by the reading side,
49+
// since _read has to be called to start processing a new chunk. However,
50+
// a pathological inflate type of transform can cause excessive buffering
51+
// here. For example, imagine a stream where every byte of input is
52+
// interpreted as an integer from 0-255, and then results in that many
53+
// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in
54+
// 1kb of data being output. In this case, you could write a very small
55+
// amount of input, and end up with a very large amount of output. In
56+
// such a pathological inflating mechanism, there'd be no way to tell
57+
// the system to stop doing the transform. A single 4MB write could
58+
// cause the system to run out of memory.
59+
//
60+
// However, even in such a pathological case, only a single written chunk
61+
// would be consumed, and then the rest would wait (un-transformed) until
62+
// the results of the previous transformed chunk were consumed.
63+
64+
'use strict';
65+
66+
const {
67+
ObjectDefineProperty,
68+
ObjectSetPrototypeOf,
69+
Symbol
70+
} = primordials;
71+
72+
module.exports = Transform;
73+
const {
74+
ERR_METHOD_NOT_IMPLEMENTED,
75+
ERR_MULTIPLE_CALLBACK,
76+
ERR_TRANSFORM_ALREADY_TRANSFORMING,
77+
ERR_TRANSFORM_WITH_LENGTH_0
78+
} = require('internal/errors').codes;
79+
const Duplex = require('internal/streams/duplex');
80+
const internalUtil = require('internal/util');
81+
82+
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
83+
ObjectSetPrototypeOf(Transform, Duplex);
84+
85+
const kTransformState = Symbol('kTransformState');
86+
87+
function afterTransform(er, data) {
88+
const ts = this[kTransformState];
89+
ts.transforming = false;
90+
91+
const cb = ts.writecb;
92+
93+
if (cb === null) {
94+
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
95+
}
96+
97+
ts.writechunk = null;
98+
ts.writecb = null;
99+
100+
if (data != null) // Single equals check for both `null` and `undefined`
101+
this.push(data);
102+
103+
cb(er);
104+
105+
const rs = this._readableState;
106+
rs.reading = false;
107+
if (rs.needReadable || rs.length < rs.highWaterMark) {
108+
this._read(rs.highWaterMark);
109+
}
110+
}
111+
112+
113+
function Transform(options) {
114+
if (!(this instanceof Transform))
115+
return new Transform(options);
116+
117+
Duplex.call(this, options);
118+
119+
this[kTransformState] = {
120+
afterTransform: afterTransform.bind(this),
121+
needTransform: false,
122+
transforming: false,
123+
writecb: null,
124+
writechunk: null,
125+
writeencoding: null
126+
};
127+
128+
// We have implemented the _read method, and done the other things
129+
// that Readable wants before the first _read call, so unset the
130+
// sync guard flag.
131+
this._readableState.sync = false;
132+
133+
if (options) {
134+
if (typeof options.transform === 'function')
135+
this._transform = options.transform;
136+
137+
if (typeof options.flush === 'function')
138+
this._flush = options.flush;
139+
}
140+
141+
// When the writable side finishes, then flush out anything remaining.
142+
this.on('prefinish', prefinish);
143+
}
144+
145+
function prefinish() {
146+
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
147+
this._flush((er, data) => {
148+
done(this, er, data);
149+
});
150+
} else {
151+
done(this, null, null);
152+
}
153+
}
154+
155+
ObjectDefineProperty(Transform.prototype, '_transformState', {
156+
get: internalUtil.deprecate(function() {
157+
return this[kTransformState];
158+
}, 'Transform.prototype._transformState is deprecated', 'DEP0143'),
159+
set: internalUtil.deprecate(function(val) {
160+
this[kTransformState] = val;
161+
}, 'Transform.prototype._transformState is deprecated', 'DEP0143')
162+
});
163+
164+
Transform.prototype.push = function(chunk, encoding) {
165+
this[kTransformState].needTransform = false;
166+
return Duplex.prototype.push.call(this, chunk, encoding);
167+
};
168+
169+
// This is the part where you do stuff!
170+
// override this function in implementation classes.
171+
// 'chunk' is an input chunk.
172+
//
173+
// Call `push(newChunk)` to pass along transformed output
174+
// to the readable side. You may call 'push' zero or more times.
175+
//
176+
// Call `cb(err)` when you are done with this chunk. If you pass
177+
// an error, then that'll put the hurt on the whole operation. If you
178+
// never call cb(), then you'll never get another chunk.
179+
Transform.prototype._transform = function(chunk, encoding, cb) {
180+
throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()');
181+
};
182+
183+
Transform.prototype._write = function(chunk, encoding, cb) {
184+
const ts = this[kTransformState];
185+
ts.writecb = cb;
186+
ts.writechunk = chunk;
187+
ts.writeencoding = encoding;
188+
if (!ts.transforming) {
189+
const rs = this._readableState;
190+
if (ts.needTransform ||
191+
rs.needReadable ||
192+
rs.length < rs.highWaterMark)
193+
this._read(rs.highWaterMark);
194+
}
195+
};
196+
197+
// Doesn't matter what the args are here.
198+
// _transform does all the work.
199+
// That we got here means that the readable side wants more data.
200+
Transform.prototype._read = function(n) {
201+
const ts = this[kTransformState];
202+
203+
if (ts.writechunk !== null && !ts.transforming) {
204+
ts.transforming = true;
205+
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
206+
} else {
207+
// Mark that we need a transform, so that any data that comes in
208+
// will get processed, now that we've asked for it.
209+
ts.needTransform = true;
210+
}
211+
};
212+
213+
214+
Transform.prototype._destroy = function(err, cb) {
215+
Duplex.prototype._destroy.call(this, err, (err2) => {
216+
cb(err2);
217+
});
218+
};
219+
220+
221+
function done(stream, er, data) {
222+
if (er)
223+
return stream.emit('error', er);
224+
225+
if (data != null) // Single equals check for both `null` and `undefined`
226+
stream.push(data);
227+
228+
// These two error cases are coherence checks that can likely not be tested.
229+
if (stream._writableState.length)
230+
throw new ERR_TRANSFORM_WITH_LENGTH_0();
231+
232+
if (stream[kTransformState].transforming)
233+
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
234+
return stream.push(null);
235+
}

‎lib/internal/streams/writable.js

+787
Large diffs are not rendered by default.

‎lib/stream.js

+5-7
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ const internalBuffer = require('internal/buffer');
2828
// Note: export Stream before Readable/Writable/Duplex/...
2929
// to avoid a cross-reference(require) issues
3030
const Stream = module.exports = require('internal/streams/legacy');
31-
32-
Stream.Readable = require('_stream_readable');
33-
Stream.Writable = require('_stream_writable');
34-
Stream.Duplex = require('_stream_duplex');
35-
Stream.Transform = require('_stream_transform');
36-
Stream.PassThrough = require('_stream_passthrough');
37-
31+
Stream.Readable = require('internal/streams/readable');
32+
Stream.Writable = require('internal/streams/writable');
33+
Stream.Duplex = require('internal/streams/duplex');
34+
Stream.Transform = require('internal/streams/transform');
35+
Stream.PassThrough = require('internal/streams/passthrough');
3836
Stream.pipeline = pipeline;
3937
Stream.finished = eos;
4038

‎node.gyp

+5
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,11 @@
226226
'lib/internal/streams/duplexpair.js',
227227
'lib/internal/streams/from.js',
228228
'lib/internal/streams/legacy.js',
229+
'lib/internal/streams/readable.js',
230+
'lib/internal/streams/writable.js',
231+
'lib/internal/streams/duplex.js',
232+
'lib/internal/streams/passthrough.js',
233+
'lib/internal/streams/transform.js',
229234
'lib/internal/streams/destroy.js',
230235
'lib/internal/streams/state.js',
231236
'lib/internal/streams/pipeline.js',

‎test/message/stdin_messages.out

+4-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ SyntaxError: Strict mode code may not include a with statement
1313
at internal/main/eval_stdin.js:*:*
1414
at Socket.<anonymous> (internal/process/execution.js:*:*)
1515
at Socket.emit (events.js:*:*)
16-
at endReadableNT (_stream_readable.js:*:*)
16+
at endReadableNT (internal/streams/readable.js:*:*)
1717
42
1818
42
1919
[stdin]:1
@@ -30,7 +30,7 @@ Error: hello
3030
at internal/main/eval_stdin.js:*:*
3131
at Socket.<anonymous> (internal/process/execution.js:*:*)
3232
at Socket.emit (events.js:*:*)
33-
at endReadableNT (_stream_readable.js:*:*)
33+
at endReadableNT (internal/streams/readable.js:*:*)
3434
[stdin]:1
3535
throw new Error("hello")
3636
^
@@ -45,7 +45,7 @@ Error: hello
4545
at internal/main/eval_stdin.js:*:*
4646
at Socket.<anonymous> (internal/process/execution.js:*:*)
4747
at Socket.emit (events.js:*:*)
48-
at endReadableNT (_stream_readable.js:*:*)
48+
at endReadableNT (internal/streams/readable.js:*:*)
4949
100
5050
[stdin]:1
5151
let x = 100; y = x;
@@ -61,7 +61,7 @@ ReferenceError: y is not defined
6161
at internal/main/eval_stdin.js:*:*
6262
at Socket.<anonymous> (internal/process/execution.js:*:*)
6363
at Socket.emit (events.js:*:*)
64-
at endReadableNT (_stream_readable.js:*:*)
64+
at endReadableNT (internal/streams/readable.js:*:*)
6565

6666
[stdin]:1
6767
let ______________________________________________; throw 10

‎test/parallel/test-bootstrap-modules.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ if (!common.isMainThread) {
9393
'Internal Binding messaging',
9494
'Internal Binding symbols',
9595
'Internal Binding worker',
96-
'NativeModule _stream_duplex',
97-
'NativeModule _stream_passthrough',
98-
'NativeModule _stream_readable',
99-
'NativeModule _stream_transform',
100-
'NativeModule _stream_writable',
96+
'NativeModule internal/streams/duplex',
97+
'NativeModule internal/streams/passthrough',
98+
'NativeModule internal/streams/readable',
99+
'NativeModule internal/streams/transform',
100+
'NativeModule internal/streams/writable',
101101
'NativeModule internal/error_serdes',
102102
'NativeModule internal/event_target',
103103
'NativeModule internal/process/worker_thread_only',

0 commit comments

Comments
 (0)
Please sign in to comment.