Skip to content

Commit

Permalink
Wrap upload body into a Transform stream
Browse files Browse the repository at this point in the history
Fixes #937
  • Loading branch information
szmarczak committed Nov 22, 2019
1 parent 52b7fc5 commit cd11a50
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 190 deletions.
24 changes: 8 additions & 16 deletions source/get-response.ts
@@ -1,14 +1,17 @@
import {promisify} from 'util';
import {IncomingMessage} from 'http';
import EventEmitter = require('events');
import stream = require('stream');
import decompressResponse = require('decompress-response');
import mimicResponse = require('mimic-response');
import {NormalizedOptions, Response} from './utils/types';
import {downloadProgress} from './progress';
import {createProgressStream} from './progress';

export default (response: IncomingMessage, options: NormalizedOptions, emitter: EventEmitter) => {
const pipeline = promisify(stream.pipeline);

export default async (response: IncomingMessage, options: NormalizedOptions, emitter: EventEmitter) => {
const downloadBodySize = Number(response.headers['content-length']) || undefined;
const progressStream = downloadProgress(emitter, downloadBodySize);
const progressStream = createProgressStream('downloadProgress', emitter, downloadBodySize);

mimicResponse(response, progressStream);

Expand All @@ -23,19 +26,8 @@ export default (response: IncomingMessage, options: NormalizedOptions, emitter:

emitter.emit('response', newResponse);

emitter.emit('downloadProgress', {
percent: 0,
transferred: 0,
total: downloadBodySize
});

stream.pipeline(
return pipeline(
response,
progressStream,
error => {
if (error) {
emitter.emit('error', error);
}
}
progressStream
);
};
12 changes: 4 additions & 8 deletions source/normalize-arguments.ts
Expand Up @@ -279,15 +279,13 @@ export const normalizeArguments = (url: URLOrOptions, options?: Options, default
const withoutBody: ReadonlySet<string> = new Set(['GET', 'HEAD']);

export type NormalizedRequestArguments = https.RequestOptions & {
body: Pick<NormalizedOptions, 'body'>;
body?: ReadableStream;
url: Pick<NormalizedOptions, 'url'>;
};

export const normalizeRequestArguments = async (options: NormalizedOptions): Promise<NormalizedRequestArguments> => {
options = mergeOptions(options);

let uploadBodySize: number | undefined;

// Serialize body
const {headers} = options;
const isForm = !is.undefined(options.form);
Expand Down Expand Up @@ -328,12 +326,10 @@ export const normalizeRequestArguments = async (options: NormalizedOptions): Pro
options.body = JSON.stringify(options.json);
}

// Convert buffer to stream to receive upload progress events (#322)
if (is.buffer(options.body)) {
uploadBodySize = options.body.length;
const uploadBodySize = await getBodySize(options);

if (!is.nodeStream(options.body)) {
options.body = toReadableStream(options.body);
} else {
uploadBodySize = await getBodySize(options);
}

// See https://tools.ietf.org/html/rfc7230#section-3.3.2
Expand Down
101 changes: 17 additions & 84 deletions source/progress.ts
@@ -1,115 +1,48 @@
import {ClientRequest} from 'http';
import {Transform as TransformStream} from 'stream';
import {Socket} from 'net';
import EventEmitter = require('events');
import is from '@sindresorhus/is';

export function downloadProgress(emitter: EventEmitter, downloadBodySize?: number): TransformStream {
let downloadedBytes = 0;
export function createProgressStream(name: 'downloadProgress' | 'uploadProgress', emitter: EventEmitter, totalBytes?: number | string): TransformStream {
let transformedBytes = 0;

if (is.string(totalBytes)) {
totalBytes = Number(totalBytes);
}

const progressStream = new TransformStream({
transform(chunk, _encoding, callback) {
downloadedBytes += chunk.length;
transformedBytes += chunk.length;

const percent = downloadBodySize ? downloadedBytes / downloadBodySize : 0;
const percent = totalBytes ? transformedBytes / (totalBytes as number) : 0;

// Let `flush()` be responsible for emitting the last event
if (percent < 1) {
emitter.emit('downloadProgress', {
emitter.emit(name, {
percent,
transferred: downloadedBytes,
total: downloadBodySize
transferred: transformedBytes,
total: totalBytes
});
}

callback(undefined, chunk);
},

flush(callback) {
emitter.emit('downloadProgress', {
emitter.emit(name, {
percent: 1,
transferred: downloadedBytes,
total: downloadBodySize
transferred: transformedBytes,
total: totalBytes
});

callback();
}
});

return progressStream;
}

export function uploadProgress(request: ClientRequest, emitter: EventEmitter, uploadBodySize?: number): void {
const uploadEventFrequency = 150;
let uploadedBytes = 0;
let progressInterval: NodeJS.Timeout;

emitter.emit('uploadProgress', {
emitter.emit(name, {
percent: 0,
transferred: 0,
total: uploadBodySize
});

request.once('error', () => {
clearInterval(progressInterval);
});

request.once('abort', () => {
clearInterval(progressInterval);
});

request.once('response', () => {
clearInterval(progressInterval);

emitter.emit('uploadProgress', {
percent: 1,
transferred: uploadedBytes,
total: uploadBodySize
});
total: totalBytes
});

request.once('socket', (socket: Socket) => {
const onSocketConnect = (): void => {
progressInterval = setInterval(() => {
const lastUploadedBytes = uploadedBytes;

/* istanbul ignore next: future versions of Node may not have this property */
if (!is.string((request as any)._header)) {
clearInterval(progressInterval);

const url = new URL('https://github.com/sindresorhus/got/issues/new');
url.searchParams.set('title', '`request._header` is not present');
url.searchParams.set('body', 'It causes `uploadProgress` to fail.');

console.warn('`request._header` is not present. Please report this as a bug:\n' + url.href);
return;
}

const headersSize = Buffer.byteLength((request as any)._header);
uploadedBytes = socket.bytesWritten - headersSize;

// Don't emit events with unchanged progress and
// prevent last event from being emitted, because
// it's emitted when `response` is emitted
if (uploadedBytes === lastUploadedBytes || uploadedBytes === uploadBodySize) {
return;
}

emitter.emit('uploadProgress', {
percent: uploadBodySize ? uploadedBytes / uploadBodySize : 0,
transferred: uploadedBytes,
total: uploadBodySize
});
}, uploadEventFrequency);
};

/* istanbul ignore next: hard to test */
if (socket.connecting) {
socket.once('connect', onSocketConnect);
} else if (socket.writable) {
// The socket is being reused from pool,
// so the connect event will not be emitted
onSocketConnect();
}
});
return progressStream;
}
105 changes: 39 additions & 66 deletions source/request-as-event-emitter.ts
@@ -1,18 +1,21 @@
import {promisify} from 'util';
import stream = require('stream');
import EventEmitter = require('events');
import http = require('http');
import CacheableRequest = require('cacheable-request');
import is from '@sindresorhus/is';
import timer, {Timings} from '@szmarczak/http-timer';
import timer from '@szmarczak/http-timer';
import timedOut, {TimeoutError as TimedOutTimeoutError} from './utils/timed-out';
import calculateRetryDelay from './calculate-retry-delay';
import getResponse from './get-response';
import {normalizeRequestArguments} from './normalize-arguments';
import {uploadProgress} from './progress';
import {createProgressStream} from './progress';
import {CacheError, MaxRedirectsError, RequestError, TimeoutError} from './errors';
import urlToOptions from './utils/url-to-options';
import {NormalizedOptions, Response, ResponseObject} from './utils/types';

const pipeline = promisify(stream.pipeline);

const redirectCodes: ReadonlySet<number> = new Set([300, 301, 302, 303, 304, 307, 308]);

export interface RequestAsEventEmitter extends EventEmitter {
Expand All @@ -28,7 +31,6 @@ export default (options: NormalizedOptions) => {
let retryCount = 0;

let currentRequest: http.ClientRequest;
let shouldAbort = false;

const emitError = async (error: Error): Promise<void> => {
try {
Expand All @@ -46,7 +48,6 @@ export default (options: NormalizedOptions) => {
const get = async (): Promise<void> => {
let httpOptions = await normalizeRequestArguments(options);

let timings: Timings;
const handleResponse = async (response: http.ServerResponse | ResponseObject): Promise<void> => {
try {
/* istanbul ignore next: fixes https://github.com/electron/electron/blob/cbb460d47628a7a146adf4419ed48550a98b2923/lib/browser/api/net.js#L59-L65 */
Expand All @@ -70,7 +71,6 @@ export default (options: NormalizedOptions) => {
typedResponse.url = options.url.toString();
typedResponse.requestUrl = requestURL;
typedResponse.retryCount = retryCount;
typedResponse.timings = timings;
typedResponse.redirectUrls = redirects;
typedResponse.request = {options};
typedResponse.isFromCache = typedResponse.fromCache ?? false;
Expand Down Expand Up @@ -134,34 +134,22 @@ export default (options: NormalizedOptions) => {
return;
}

getResponse(typedResponse, options, emitter);
await getResponse(typedResponse, options, emitter);
} catch (error) {
emitError(error);
}
};

const handleRequest = (request: http.ClientRequest): void => {
if (shouldAbort) {
request.abort();
return;
}

currentRequest = request;

const handleRequest = async (request: http.ClientRequest): Promise<void> => {
// `request.aborted` is a boolean since v11.0.0: https://github.com/nodejs/node/commit/4b00c4fafaa2ae8c41c1f78823c0feb810ae4723#diff-e3bc37430eb078ccbafe3aa3b570c91a
// We need to allow `TimedOutTimeoutError` here, because it `stream.pipeline(…)` aborts it automatically.
const isAborted = () => typeof request.aborted === 'number' || (request.aborted as unknown as boolean) === true;

const onError = (error: Error): void => {
const isTimedOutError = error instanceof TimedOutTimeoutError;

if (!isTimedOutError && isAborted()) {
return;
}
currentRequest = request;

if (isTimedOutError) {
// @ts-ignore TS is dumb.
error = new TimeoutError(error, timings, options);
const onError = (error: Error): void => {
if (error instanceof TimedOutTimeoutError) {
error = new TimeoutError(error, request.timings, options);
} else {
error = new RequestError(error, options);
}
Expand All @@ -171,54 +159,37 @@ export default (options: NormalizedOptions) => {
}
};

const uploadComplete = (error?: Error): void => {
if (error) {
onError(error);
return;
}

// No need to attach an error handler here,
// as `stream.pipeline(…)` doesn't remove this handler
// to allow stream reuse.

request.emit('upload-complete');
};

request.on('error', onError);
try {
timer(request);
timedOut(request, options.timeout, options.url);

timings = timer(request); // TODO: Make `@szmarczak/http-timer` set `request.timings` and `response.timings`
emitter.emit('request', request);

const uploadBodySize = httpOptions.headers['content-length'] ? Number(httpOptions.headers['content-length']) : undefined;
uploadProgress(request, emitter, uploadBodySize);
const uploadStream = createProgressStream('uploadProgress', emitter, httpOptions.headers['content-length'] as string);

timedOut(request, options.timeout, options.url);
await pipeline(
// @ts-ignore Cannot assign ReadableStream to ReadableStream
httpOptions.body,
uploadStream,
request
);

emitter.emit('request', request);
request.once('error', error => {
if (isAborted() && !(error instanceof TimedOutTimeoutError)) {
return;
}

if (isAborted()) {
return;
}
onError(error);
});

try {
if (options.method === 'POST' || options.method === 'PUT' || options.method === 'PATCH') {
if (is.nodeStream(httpOptions.body)) {
// `stream.pipeline(…)` handles `error` for us.
request.removeListener('error', onError);

stream.pipeline(
// @ts-ignore Upgrade `@sindresorhus/is`
httpOptions.body,
request,
uploadComplete
);
} else {
request.end(httpOptions.body, uploadComplete);
}
} else {
request.end(uploadComplete);
}
request.emit('upload-complete');
} catch (error) {
emitError(new RequestError(error, options));
if (isAborted() && error.message === 'Premature close') {
// The request was aborted on purpose
return;
}

onError(error);
}
};

Expand Down Expand Up @@ -296,10 +267,12 @@ export default (options: NormalizedOptions) => {
};

emitter.abort = () => {
emitter.prependListener('request', (request: http.ClientRequest) => {
request.abort();
});

if (currentRequest) {
currentRequest.abort();
} else {
shouldAbort = true;
}
};

Expand Down

0 comments on commit cd11a50

Please sign in to comment.