Skip to content

Commit

Permalink
http: outgoing cork
Browse files Browse the repository at this point in the history
PR-URL: #29053
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
ronag authored and BethGriggs committed Feb 6, 2020
1 parent 55ca3a8 commit db8144b
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 7 deletions.
16 changes: 16 additions & 0 deletions doc/api/http.md
Expand Up @@ -1223,6 +1223,13 @@ added: v0.3.0

See [`response.socket`][].

### `response.cork()`
<!-- YAML
added: REPLACEME
-->

See [`writable.cork()`][].

### `response.end([data[, encoding]][, callback])`
<!-- YAML
added: v0.1.90
Expand Down Expand Up @@ -1508,6 +1515,13 @@ response.statusMessage = 'Not found';
After response header was sent to the client, this property indicates the
status message which was sent out.

### `response.uncork()`
<!-- YAML
added: REPLACEME
-->

See [`writable.uncork()`][].

### `response.writableEnded`
<!-- YAML
added: v12.9.0
Expand Down Expand Up @@ -2348,3 +2362,5 @@ not abort the request or do anything besides add a `'timeout'` event.
[`socket.unref()`]: net.html#net_socket_unref
[`url.parse()`]: url.html#url_url_parse_urlstring_parsequerystring_slashesdenotehost
[`HPE_HEADER_OVERFLOW`]: errors.html#errors_hpe_header_overflow
[`writable.cork()`]: stream.html#stream_writable_cork
[`writable.uncork()`]: stream.html#stream_writable_uncork
47 changes: 40 additions & 7 deletions lib/_http_outgoing.js
Expand Up @@ -55,6 +55,8 @@ const { validateString } = require('internal/validators');
const HIGH_WATER_MARK = getDefaultHighWaterMark();
const { CRLF, debug } = common;

const kCorked = Symbol('corked');

const RE_CONN_CLOSE = /(?:^|\W)close(?:$|\W)/i;
const RE_TE_CHUNKED = common.chunkExpression;

Expand Down Expand Up @@ -98,6 +100,7 @@ function OutgoingMessage() {

this.finished = false;
this._headerSent = false;
this[kCorked] = 0;

this.socket = null;
this.connection = null;
Expand Down Expand Up @@ -137,6 +140,13 @@ Object.defineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
}
});

Object.defineProperty(OutgoingMessage.prototype, 'writableCorked', {
get() {
const corked = this.socket ? this.socket.writableCorked : 0;
return corked + this[kCorked];
}
});

Object.defineProperty(OutgoingMessage.prototype, '_headers', {
get: internalUtil.deprecate(function() {
return this.getHeaders();
Expand Down Expand Up @@ -204,6 +214,21 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() {
return headers;
};

OutgoingMessage.prototype.cork = function() {
if (this.socket) {
this.socket.cork();
} else {
this[kCorked]++;
}
};

OutgoingMessage.prototype.uncork = function() {
if (this.socket) {
this.socket.uncork();
} else if (this[kCorked]) {
this[kCorked]--;
}
};

OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {

Expand Down Expand Up @@ -694,7 +719,10 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
return this;
}

var uncork;
if (this.socket) {
this.socket.cork();
}

if (chunk) {
if (typeof chunk !== 'string' && !(chunk instanceof Buffer)) {
throw new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
Expand All @@ -705,10 +733,6 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
else
this._contentLength = chunk.length;
}
if (this.connection) {
this.connection.cork();
uncork = true;
}
write_(this, chunk, encoding, null, true);
} else if (!this._header) {
this._contentLength = 0;
Expand All @@ -727,8 +751,12 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
this._send('', 'latin1', finish);
}

if (uncork)
this.connection.uncork();
if (this.socket) {
// Fully uncork connection on end().
this.socket._writableState.corked = 1;
this.socket.uncork();
}
this[kCorked] = 0;

this.finished = true;

Expand Down Expand Up @@ -789,6 +817,11 @@ OutgoingMessage.prototype._flush = function _flush() {
};

OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
while (this[kCorked]) {
this[kCorked]--;
socket.cork();
}

const outputLength = this.outputData.length;
if (outputLength <= 0)
return undefined;
Expand Down
12 changes: 12 additions & 0 deletions lib/internal/http2/compat.js
Expand Up @@ -503,6 +503,10 @@ class Http2ServerResponse extends Stream {
return this[kState].statusCode;
}

get writableCorked() {
return this[kStream].writableCorked;
}

set statusCode(code) {
code |= 0;
if (code >= 100 && code < 200)
Expand Down Expand Up @@ -627,6 +631,14 @@ class Http2ServerResponse extends Stream {
return this;
}

cork() {
this[kStream].cork();
}

uncork() {
this[kStream].uncork();
}

write(chunk, encoding, cb) {
if (typeof encoding === 'function') {
cb = encoding;
Expand Down
33 changes: 33 additions & 0 deletions test/parallel/test-http-response-cork.js
@@ -0,0 +1,33 @@
'use strict';
const common = require('../common');
const http = require('http');
const assert = require('assert');

const server = http.createServer((req, res) => {
let corked = false;
const originalWrite = res.socket.write;
res.socket.write = common.mustCall((...args) => {
assert.strictEqual(corked, false);
return originalWrite.call(res.socket, ...args);
}, 5);
corked = true;
res.cork();
assert.strictEqual(res.writableCorked, res.socket.writableCorked);
res.cork();
assert.strictEqual(res.writableCorked, res.socket.writableCorked);
res.writeHead(200, { 'a-header': 'a-header-value' });
res.uncork();
assert.strictEqual(res.writableCorked, res.socket.writableCorked);
corked = false;
res.end('asd');
assert.strictEqual(res.writableCorked, res.socket.writableCorked);
});

server.listen(0, () => {
http.get({ port: server.address().port }, (res) => {
res.on('data', common.mustCall());
res.on('end', common.mustCall(() => {
server.close();
}));
});
});

0 comments on commit db8144b

Please sign in to comment.