diff --git a/packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts b/packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts index deaa19a0a2285b3..8aa6aa826d757a9 100644 --- a/packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts +++ b/packages/next-swc/crates/next-core/js/src/entry/app/edge-page-bootstrap.ts @@ -80,11 +80,27 @@ async function render(request: NextRequest, event: NextFetchEvent) { response.headers.append('Vary', RSC_VARY_HEADER) const writer = tranform.writable.getWriter() - result.pipe({ + + let innerClose: undefined | (() => void) + const target = { write: (chunk: Uint8Array) => writer.write(chunk), end: () => writer.close(), - destroy: (reason?: Error) => writer.abort(reason), - }) + + on(_event: 'close', cb: () => void) { + innerClose = cb + }, + off(_event: 'close', _cb: () => void) { + innerClose = undefined + }, + } + const onClose = () => { + innerClose?.() + } + // No, this cannot be replaced with `finally`, because early cancelling + // the stream will create a rejected promise, and finally will create an + // unhandled rejection. + writer.closed.then(onClose, onClose) + result.pipe(target) return response } diff --git a/packages/next-swc/crates/next-core/js/src/internal/nodejs-proxy-handler.ts b/packages/next-swc/crates/next-core/js/src/internal/nodejs-proxy-handler.ts index 2f5f56e74cdb278..873ddc77886f9f8 100644 --- a/packages/next-swc/crates/next-core/js/src/internal/nodejs-proxy-handler.ts +++ b/packages/next-swc/crates/next-core/js/src/internal/nodejs-proxy-handler.ts @@ -11,7 +11,10 @@ import { NodeNextResponse, } from 'next/dist/server/base-http/node' import { sendResponse } from 'next/dist/server/send-response' -import { NextRequestAdapter } from 'next/dist/server/web/spec-extension/adapters/next-request' +import { + NextRequestAdapter, + signalFromNodeResponse, +} from 'next/dist/server/web/spec-extension/adapters/next-request' import { RouteHandlerManagerContext } from 'next/dist/server/future/route-handler-managers/route-handler-manager' import { attachRequestMeta } from './next-request-helpers' @@ -43,7 +46,10 @@ export default (routeModule: RouteModule) => { } const routeResponse = await routeModule.handle( - NextRequestAdapter.fromNodeNextRequest(req), + NextRequestAdapter.fromNodeNextRequest( + req, + signalFromNodeResponse(response) + ), context ) diff --git a/packages/next/src/export/worker.ts b/packages/next/src/export/worker.ts index eaa61b851d9ab82..5fe8e690ff29573 100644 --- a/packages/next/src/export/worker.ts +++ b/packages/next/src/export/worker.ts @@ -45,7 +45,10 @@ import { NodeNextRequest } from '../server/base-http/node' import { isAppRouteRoute } from '../lib/is-app-route-route' import { toNodeOutgoingHttpHeaders } from '../server/web/utils' import { RouteModuleLoader } from '../server/future/helpers/module-loader/route-module-loader' -import { NextRequestAdapter } from '../server/web/spec-extension/adapters/next-request' +import { + NextRequestAdapter, + signalFromNodeResponse, +} from '../server/web/spec-extension/adapters/next-request' import * as ciEnvironment from '../telemetry/ci-info' const envConfig = require('../shared/lib/runtime-config') @@ -388,7 +391,8 @@ export default async function exportPage({ // Ensure that the url for the page is absolute. req.url = `http://localhost:3000${req.url}` const request = NextRequestAdapter.fromNodeNextRequest( - new NodeNextRequest(req) + new NodeNextRequest(req), + signalFromNodeResponse(res) ) // Create the context for the handler. This contains the params from diff --git a/packages/next/src/server/base-server.ts b/packages/next/src/server/base-server.ts index 134948c1bc69655..70660e4c5eca9cb 100644 --- a/packages/next/src/server/base-server.ts +++ b/packages/next/src/server/base-server.ts @@ -116,6 +116,7 @@ import { type RouteMatch, } from './future/route-matches/route-match' import { normalizeLocalePath } from '../shared/lib/i18n/normalize-locale-path' +import { signalFromNodeResponse } from './web/spec-extension/adapters/next-request' export type FindComponentsResult = { components: LoadComponentsReturnType @@ -1837,7 +1838,12 @@ export default abstract class Server { try { // Handle the match and collect the response if it's a static response. - const response = await this.handlers.handle(match, req, context) + const response = await this.handlers.handle( + match, + req, + context, + signalFromNodeResponse((res as NodeNextResponse).originalResponse) + ) ;(req as any).fetchMetrics = ( context.staticGenerationContext as any diff --git a/packages/next/src/server/future/route-handler-managers/route-handler-manager.ts b/packages/next/src/server/future/route-handler-managers/route-handler-manager.ts index de7e8360238cbc6..011df25738a2fd8 100644 --- a/packages/next/src/server/future/route-handler-managers/route-handler-manager.ts +++ b/packages/next/src/server/future/route-handler-managers/route-handler-manager.ts @@ -24,7 +24,8 @@ export class RouteHandlerManager { public async handle( match: AppRouteRouteMatch, req: BaseNextRequest, - context: RouteHandlerManagerContext + context: RouteHandlerManagerContext, + signal: AbortSignal ): Promise { // The module supports minimal mode, load the minimal module. const module = await RouteModuleLoader.load( @@ -33,7 +34,7 @@ export class RouteHandlerManager { ) // Convert the BaseNextRequest to a NextRequest. - const request = NextRequestAdapter.fromBaseNextRequest(req) + const request = NextRequestAdapter.fromBaseNextRequest(req, signal) // Get the response from the handler and send it back. return await module.handle(request, context) diff --git a/packages/next/src/server/lib/route-resolver.ts b/packages/next/src/server/lib/route-resolver.ts index 90867437777cad2..e74cc02310727b2 100644 --- a/packages/next/src/server/lib/route-resolver.ts +++ b/packages/next/src/server/lib/route-resolver.ts @@ -17,9 +17,9 @@ import { proxyRequest } from './router-utils/proxy-request' import { getResolveRoutes } from './router-utils/resolve-routes' import { PERMANENT_REDIRECT_STATUS } from '../../shared/lib/constants' import { splitCookiesString, toNodeOutgoingHttpHeaders } from '../web/utils' -import { signalFromNodeRequest } from '../web/spec-extension/adapters/next-request' +import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request' import { getMiddlewareRouteMatcher } from '../../shared/lib/router/utils/middleware-route-matcher' -import { pipeReadable } from './server-ipc/invoke-request' +import { pipeReadable } from '../pipe-readable' type RouteResult = | { @@ -132,7 +132,7 @@ export async function makeResolver( serverAddr.port || 3000 }${req.url}`, body: cloneableBody, - signal: signalFromNodeRequest(req), + signal: signalFromNodeResponse(res), }, useCache: true, onWarning: console.warn, @@ -160,11 +160,11 @@ export async function makeResolver( } res.statusCode = result.response.status - for await (const chunk of result.response.body || ([] as any)) { - if (res.closed) break - res.write(chunk) + if (result.response.body) { + await pipeReadable(result.response.body, res) + } else { + res.end() } - res.end() } catch (err) { console.error(err) res.statusCode = 500 @@ -218,7 +218,12 @@ export async function makeResolver( req: IncomingMessage, res: ServerResponse ): Promise { - const routeResult = await resolveRoutes(req, new Set(), false) + const routeResult = await resolveRoutes( + req, + new Set(), + false, + signalFromNodeResponse(res) + ) const { matchedOutput, bodyStream, diff --git a/packages/next/src/server/lib/router-server.ts b/packages/next/src/server/lib/router-server.ts index 790b1f072e7d620..c80d3a8b502be2f 100644 --- a/packages/next/src/server/lib/router-server.ts +++ b/packages/next/src/server/lib/router-server.ts @@ -15,7 +15,8 @@ import { filterReqHeaders } from './server-ipc/utils' import { findPagesDir } from '../../lib/find-pages-dir' import { setupFsCheck } from './router-utils/filesystem' import { proxyRequest } from './router-utils/proxy-request' -import { invokeRequest, pipeReadable } from './server-ipc/invoke-request' +import { invokeRequest } from './server-ipc/invoke-request' +import { isAbortError, pipeReadable } from '../pipe-readable' import { createRequestResponseMocks } from './mock-request' import { createIpcServer, createWorker } from './server-ipc' import { UnwrapPromise } from '../../lib/coalesced-function' @@ -29,6 +30,7 @@ import { PHASE_DEVELOPMENT_SERVER, PERMANENT_REDIRECT_STATUS, } from '../../shared/lib/constants' +import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request' let initializeResult: | undefined @@ -331,14 +333,26 @@ export async function initialize(opts: { debug('invokeRender', renderUrl, invokeHeaders) - const invokeRes = await invokeRequest( - renderUrl, - { - headers: invokeHeaders, - method: req.method, - }, - getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream() - ) + let invokeRes + try { + invokeRes = await invokeRequest( + renderUrl, + { + headers: invokeHeaders, + method: req.method, + signal: signalFromNodeResponse(res), + }, + getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream() + ) + } catch (e) { + // If the client aborts before we can receive a response object (when + // the headers are flushed), then we can early exit without further + // processing. + if (isAbortError(e)) { + return + } + throw e + } debug('invokeRender res', invokeRes.status, invokeRes.headers) @@ -419,7 +433,12 @@ export async function initialize(opts: { resHeaders, bodyStream, matchedOutput, - } = await resolveRoutes(req, matchedDynamicRoutes, false) + } = await resolveRoutes( + req, + matchedDynamicRoutes, + false, + signalFromNodeResponse(res) + ) if (devInstance && matchedOutput?.type === 'devVirtualFsItem') { const origUrl = req.url || '/' @@ -687,7 +706,8 @@ export async function initialize(opts: { const { matchedOutput, parsedUrl } = await resolveRoutes( req, new Set(), - true + true, + signalFromNodeResponse(socket) ) // TODO: allow upgrade requests to pages/app paths? diff --git a/packages/next/src/server/lib/router-utils/proxy-request.ts b/packages/next/src/server/lib/router-utils/proxy-request.ts index 60fb0df7209da37..c95df56f0f29250 100644 --- a/packages/next/src/server/lib/router-utils/proxy-request.ts +++ b/packages/next/src/server/lib/router-utils/proxy-request.ts @@ -34,6 +34,24 @@ export async function proxyRequest( await new Promise((proxyResolve, proxyReject) => { let finished = false + // http-proxy does not properly detect a client disconnect in newer + // versions of Node.js. This is caused because it only listens for the + // `aborted` event on the our request object, but it also fully reads + // and closes the request object. Node **will not** fire `aborted` when + // the request is already closed. Listening for `close` on our response + // object will detect the disconnect, and we can abort the proxy's + // connection. + proxy.on('proxyReq', (proxyReq) => { + res.on('close', () => proxyReq.destroy()) + }) + proxy.on('proxyRes', (proxyRes) => { + if (res.destroyed) { + proxyRes.destroy() + } else { + res.on('close', () => proxyRes.destroy()) + } + }) + proxy.on('proxyRes', (proxyRes, innerReq, innerRes) => { const cleanup = (err: any) => { // cleanup event listeners to allow clean garbage collection @@ -59,7 +77,7 @@ export async function proxyRequest( finished = true proxyReject(err) - if (!res.closed) { + if (!res.destroyed) { res.statusCode = 500 res.end('Internal Server Error') } diff --git a/packages/next/src/server/lib/router-utils/resolve-routes.ts b/packages/next/src/server/lib/router-utils/resolve-routes.ts index dcdce36c9eb8409..07fdfc4293612df 100644 --- a/packages/next/src/server/lib/router-utils/resolve-routes.ts +++ b/packages/next/src/server/lib/router-utils/resolve-routes.ts @@ -13,6 +13,7 @@ import { Header } from '../../../lib/load-custom-routes' import { stringifyQuery } from '../../server-route-utils' import { toNodeOutgoingHttpHeaders } from '../../web/utils' import { invokeRequest } from '../server-ipc/invoke-request' +import { isAbortError } from '../../pipe-readable' import { getCookieParser, setLazyProp } from '../../api-utils' import { getHostname } from '../../../shared/lib/get-hostname' import { UnwrapPromise } from '../../../lib/coalesced-function' @@ -93,7 +94,8 @@ export function getResolveRoutes( async function resolveRoutes( req: IncomingMessage, matchedDynamicRoutes: Set, - isUpgradeReq?: boolean + isUpgradeReq: boolean, + signal: AbortSignal ): Promise<{ finished: boolean statusCode?: number @@ -453,14 +455,31 @@ export function getResolveRoutes( debug('invoking middleware', renderUrl, invokeHeaders) - const middlewareRes = await invokeRequest( - renderUrl, - { - headers: invokeHeaders, - method: req.method, - }, - getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream() - ) + let middlewareRes + try { + middlewareRes = await invokeRequest( + renderUrl, + { + headers: invokeHeaders, + method: req.method, + signal, + }, + getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream() + ) + } catch (e) { + // If the client aborts before we can receive a response object + // (when the headers are flushed), then we can early exit without + // further processing. + if (isAbortError(e)) { + return { + parsedUrl, + resHeaders, + finished: true, + } + } + throw e + } + const middlewareHeaders = toNodeOutgoingHttpHeaders( middlewareRes.headers ) as Record diff --git a/packages/next/src/server/lib/server-ipc/invoke-request.ts b/packages/next/src/server/lib/server-ipc/invoke-request.ts index c401f5ac928c2d6..c9370983329adc9 100644 --- a/packages/next/src/server/lib/server-ipc/invoke-request.ts +++ b/packages/next/src/server/lib/server-ipc/invoke-request.ts @@ -1,7 +1,7 @@ import '../../node-polyfill-fetch' import type { IncomingMessage } from 'http' -import type { Writable, Readable } from 'stream' +import type { Readable } from 'stream' import { filterReqHeaders } from './utils' export const invokeRequest = async ( @@ -9,6 +9,7 @@ export const invokeRequest = async ( requestInit: { headers: IncomingMessage['headers'] method: IncomingMessage['method'] + signal?: AbortSignal }, readableBody?: Readable | ReadableStream ) => { @@ -22,10 +23,11 @@ export const invokeRequest = async ( ...requestInit.headers, }) as IncomingMessage['headers'] - const invokeRes = await fetch(parsedTargetUrl.toString(), { + return await fetch(parsedTargetUrl.toString(), { headers: invokeHeaders as any as Headers, method: requestInit.method, redirect: 'manual', + signal: requestInit.signal, ...(requestInit.method !== 'GET' && requestInit.method !== 'HEAD' && @@ -41,31 +43,4 @@ export const invokeRequest = async ( internal: true, }, }) - - return invokeRes -} - -export async function pipeReadable( - readable: ReadableStream, - writable: Writable -) { - const reader = readable.getReader() - - async function doRead() { - const item = await reader.read() - - if (item?.value) { - writable.write(Buffer.from(item?.value)) - - if ('flush' in writable) { - ;(writable as any).flush() - } - } - - if (!item?.done) { - return doRead() - } - } - await doRead() - writable.end() } diff --git a/packages/next/src/server/lib/start-server.ts b/packages/next/src/server/lib/start-server.ts index cb79ca3b83d6346..cd7538455db664a 100644 --- a/packages/next/src/server/lib/start-server.ts +++ b/packages/next/src/server/lib/start-server.ts @@ -13,12 +13,14 @@ import { getCloneableBody } from '../body-streams' import { filterReqHeaders } from './server-ipc/utils' import setupCompression from 'next/dist/compiled/compression' import { normalizeRepeatedSlashes } from '../../shared/lib/utils' -import { invokeRequest, pipeReadable } from './server-ipc/invoke-request' +import { invokeRequest } from './server-ipc/invoke-request' +import { isAbortError, pipeReadable } from '../pipe-readable' import { genRouterWorkerExecArgv, getDebugPort, getNodeOptionsWithoutInspect, } from './utils' +import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request' const debug = setupDebug('next:start-server') @@ -388,14 +390,26 @@ export async function startServer({ targetHost === 'localhost' ? '127.0.0.1' : targetHost }:${routerPort}${req.url || '/'}` - const invokeRes = await invokeRequest( - targetUrl, - { - headers: req.headers, - method: req.method, - }, - getCloneableBody(req).cloneBodyStream() - ) + let invokeRes + try { + invokeRes = await invokeRequest( + targetUrl, + { + headers: req.headers, + method: req.method, + signal: signalFromNodeResponse(res), + }, + getCloneableBody(req).cloneBodyStream() + ) + } catch (e) { + // If the client aborts before we can receive a response object (when + // the headers are flushed), then we can early exit without further + // processing. + if (isAbortError(e)) { + return + } + throw e + } res.statusCode = invokeRes.status res.statusMessage = invokeRes.statusText diff --git a/packages/next/src/server/next-server.ts b/packages/next/src/server/next-server.ts index 57431a8634fabf4..ac91f9318fc99ed 100644 --- a/packages/next/src/server/next-server.ts +++ b/packages/next/src/server/next-server.ts @@ -91,12 +91,13 @@ import { getTracer } from './lib/trace/tracer' import { NextNodeServerSpan } from './lib/trace/constants' import { nodeFs } from './lib/node-fs-methods' import { getRouteRegex } from '../shared/lib/router/utils/route-regex' -import { invokeRequest, pipeReadable } from './lib/server-ipc/invoke-request' +import { invokeRequest } from './lib/server-ipc/invoke-request' +import { pipeReadable } from './pipe-readable' import { filterReqHeaders } from './lib/server-ipc/utils' import { createRequestResponseMocks } from './lib/mock-request' import chalk from 'next/dist/compiled/chalk' import { NEXT_RSC_UNION_QUERY } from '../client/components/app-router-headers' -import { signalFromNodeRequest } from './web/spec-extension/adapters/next-request' +import { signalFromNodeResponse } from './web/spec-extension/adapters/next-request' import { RouteModuleLoader } from './future/helpers/module-loader/route-module-loader' import { loadManifest } from './load-manifest' @@ -520,6 +521,7 @@ export default class NextNodeServer extends BaseServer { { method: newReq.method || 'GET', headers: newReq.headers, + signal: signalFromNodeResponse(res.originalResponse), } ) const filteredResHeaders = filterReqHeaders( @@ -1522,8 +1524,8 @@ export default class NextNodeServer extends BaseServer { url: url, page: page, body: getRequestMeta(params.request, '__NEXT_CLONABLE_BODY'), - signal: signalFromNodeRequest( - (params.request as NodeNextRequest).originalRequest + signal: signalFromNodeResponse( + (params.response as NodeNextResponse).originalResponse ), }, useCache: true, @@ -1624,14 +1626,12 @@ export default class NextNodeServer extends BaseServer { res.statusCode = result.response.status const { originalResponse } = res as NodeNextResponse - for await (const chunk of result.response.body || ([] as any)) { - if (originalResponse.closed) break - this.streamResponseChunk(originalResponse, chunk) - } - res.send() - return { - finished: true, + if (result.response.body) { + await pipeReadable(result.response.body, originalResponse) + } else { + originalResponse.end() } + return { finished: true } } } catch (err) { if (isError(err) && err.code === 'ENOENT') { @@ -1805,8 +1805,8 @@ export default class NextNodeServer extends BaseServer { ...(params.params && { params: params.params }), }, body: getRequestMeta(params.req, '__NEXT_CLONABLE_BODY'), - signal: signalFromNodeRequest( - (params.req as NodeNextRequest).originalRequest + signal: signalFromNodeResponse( + (params.res as NodeNextResponse).originalResponse ), }, useCache: true, @@ -1835,19 +1835,7 @@ export default class NextNodeServer extends BaseServer { const nodeResStream = (params.res as NodeNextResponse).originalResponse if (result.response.body) { - // TODO(gal): not sure that we always need to stream - const { consumeUint8ArrayReadableStream } = - require('next/dist/compiled/edge-runtime') as typeof import('next/dist/compiled/edge-runtime') - try { - for await (const chunk of consumeUint8ArrayReadableStream( - result.response.body - )) { - if (nodeResStream.closed) break - nodeResStream.write(chunk) - } - } finally { - nodeResStream.end() - } + await pipeReadable(result.response.body, nodeResStream) } else { nodeResStream.end() } diff --git a/packages/next/src/server/pipe-readable.ts b/packages/next/src/server/pipe-readable.ts new file mode 100644 index 000000000000000..37961a6c6c26177 --- /dev/null +++ b/packages/next/src/server/pipe-readable.ts @@ -0,0 +1,100 @@ +export function isAbortError(e: any): e is Error & { name: 'AbortError' } { + return e?.name === 'AbortError' +} + +/** + * This is a minimal implementation of a Writable with just enough + * functionality to handle stream cancellation. + */ +export interface PipeTarget { + /** + * Called when new data is read from readable source. + */ + write: (chunk: Uint8Array) => unknown + + /** + * Always called once we read all data (if the writable isn't already + * destroyed by a client disconnect). + */ + end: () => unknown + + /** + * An optional method which is called after every write, to support + * immediately streaming in gzip responses. + */ + flush?: () => unknown + + /** + * The close event listener is necessary for us to detect an early client + * disconnect while we're attempting to read data. This must be done + * out-of-band so that we can cancel the readable (else we'd have to wait for + * the readable to produce more data before we could tell it to cancel). + */ + on: (event: 'close', cb: () => void) => void + + /** + * Allows us to cleanup our onClose listener. + */ + off: (event: 'close', cb: () => void) => void +} + +export async function pipeReadable( + readable: ReadableStream, + writable: PipeTarget +) { + const reader = readable.getReader() + let readerDone = false + let writableClosed = false + + // It's not enough just to check for `writable.destroyed`, because the client + // may disconnect while we're waiting for a read. We need to immediately + // cancel the readable, and that requires an out-of-band listener. + function onClose() { + writableClosed = true + writable.off('close', onClose) + + // If the reader is not yet done, we need to cancel it so that the stream + // source's resources can be cleaned up. If a read is in-progress, this + // will also ensure the read promise rejects and frees our resources. + if (!readerDone) { + readerDone = true + reader.cancel().catch(() => {}) + } + } + writable.on('close', onClose) + + try { + while (true) { + const { done, value } = await reader.read() + readerDone = done + + if (done || writableClosed) { + break + } + + if (value) { + writable.write(Buffer.from(value)) + writable.flush?.() + } + } + } catch (e) { + // If the client disconnects, we don't want to emit an unhandled error. + if (!isAbortError(e)) { + throw e + } + } finally { + writable.off('close', onClose) + + // If we broke out of the loop because of a client disconnect, and the + // close event hasn't yet fired, we can early cancel. + if (!readerDone) { + reader.cancel().catch(() => {}) + } + + // If the client hasn't disconnected yet, end the writable so that the + // response sends the final bytes. + if (!writableClosed) { + writable.end() + } + } +} diff --git a/packages/next/src/server/render-result.ts b/packages/next/src/server/render-result.ts index 6975732502bcd85..423da8b912afe49 100644 --- a/packages/next/src/server/render-result.ts +++ b/packages/next/src/server/render-result.ts @@ -1,3 +1,5 @@ +import { pipeReadable, PipeTarget } from './pipe-readable' + type ContentTypeOption = string | undefined export type RenderResultMetadata = { @@ -11,13 +13,6 @@ export type RenderResultMetadata = { type RenderResultResponse = string | ReadableStream | null -export interface PipeTarget { - write: (chunk: Uint8Array) => unknown - end: () => unknown - flush?: () => unknown - destroy: (err?: Error) => unknown -} - export default class RenderResult { /** * The detected content type for the response. This is used to set the @@ -105,41 +100,6 @@ export default class RenderResult { ) } - const flush = - 'flush' in res && typeof res.flush === 'function' - ? res.flush.bind(res) - : () => {} - const reader = this.response.getReader() - - let shouldFatalError = false - try { - let result = await reader.read() - if (!result.done) { - // As we're going to write to the response, we should destroy the - // response if an error occurs. - shouldFatalError = true - } - - while (!result.done) { - // Write the data to the response. - res.write(result.value) - - // Flush it to the client (if it supports flushing). - flush() - - // Read the next chunk. - result = await reader.read() - } - - // We're done writing to the response, so we can end it. - res.end() - } catch (err) { - // If we've written to the response, we should destroy it. - if (shouldFatalError) { - res.destroy(err as any) - } - - throw err - } + return await pipeReadable(this.response, res) } } diff --git a/packages/next/src/server/send-response.ts b/packages/next/src/server/send-response.ts index a722f74bcfb4970..6e9a1ce32e5113c 100644 --- a/packages/next/src/server/send-response.ts +++ b/packages/next/src/server/send-response.ts @@ -1,5 +1,6 @@ import type { BaseNextRequest, BaseNextResponse } from './base-http' import type { NodeNextResponse } from './base-http/node' +import { pipeReadable } from './pipe-readable' import { splitCookiesString } from './web/utils' /** @@ -44,16 +45,7 @@ export async function sendResponse( // A response body must not be sent for HEAD requests. See https://httpwg.org/specs/rfc9110.html#HEAD if (response.body && req.method !== 'HEAD') { - const { consumeUint8ArrayReadableStream } = - require('next/dist/compiled/edge-runtime') as typeof import('next/dist/compiled/edge-runtime') - const iterator = consumeUint8ArrayReadableStream(response.body) - try { - for await (const chunk of iterator) { - originalResponse.write(chunk) - } - } finally { - originalResponse.end() - } + await pipeReadable(response.body, originalResponse) } else { originalResponse.end() } diff --git a/packages/next/src/server/stream-utils/node-web-streams-helper.ts b/packages/next/src/server/stream-utils/node-web-streams-helper.ts index d6e2bd41be64ddb..be68dacdfac468e 100644 --- a/packages/next/src/server/stream-utils/node-web-streams-helper.ts +++ b/packages/next/src/server/stream-utils/node-web-streams-helper.ts @@ -24,9 +24,12 @@ export const streamToBufferedResult = async ( renderChunks.push(decodeText(chunk, textDecoder)) }, end() {}, - destroy() {}, + + // We do not support stream cancellation + on() {}, + off() {}, } - await renderResult.pipe(writable as any) + await renderResult.pipe(writable) return renderChunks.join('') } diff --git a/packages/next/src/server/web-server.ts b/packages/next/src/server/web-server.ts index 1695870821518ff..3b9583a86afabb5 100644 --- a/packages/next/src/server/web-server.ts +++ b/packages/next/src/server/web-server.ts @@ -374,14 +374,27 @@ export default class NextWebServer extends BaseServer { if (options.result.isDynamic) { const writer = res.transformStream.writable.getWriter() - options.result.pipe({ + + let innerClose: undefined | (() => void) + const target = { write: (chunk: Uint8Array) => writer.write(chunk), end: () => writer.close(), - destroy: (err: Error) => writer.abort(err), - cork: () => {}, - uncork: () => {}, - // Not implemented: on/removeListener - } as any) + + on(_event: 'close', cb: () => void) { + innerClose = cb + }, + off(_event: 'close', _cb: () => void) { + innerClose = undefined + }, + } + const onClose = () => { + innerClose?.() + } + // No, this cannot be replaced with `finally`, because early cancelling + // the stream will create a rejected promise, and finally will create an + // unhandled rejection. + writer.closed.then(onClose, onClose) + options.result.pipe(target) } else { const payload = await options.result.toUnchunkedString() res.setHeader('Content-Length', String(byteLength(payload))) diff --git a/packages/next/src/server/web/spec-extension/adapters/next-request.ts b/packages/next/src/server/web/spec-extension/adapters/next-request.ts index 3efe37370b341e5..dc0b708c2627ecf 100644 --- a/packages/next/src/server/web/spec-extension/adapters/next-request.ts +++ b/packages/next/src/server/web/spec-extension/adapters/next-request.ts @@ -1,33 +1,62 @@ import type { BaseNextRequest } from '../../../base-http' import type { NodeNextRequest } from '../../../base-http/node' import type { WebNextRequest } from '../../../base-http/web' -import type { IncomingMessage } from 'node:http' +import type { Writable } from 'node:stream' import { getRequestMeta } from '../../../request-meta' import { fromNodeOutgoingHttpHeaders } from '../../utils' import { NextRequest } from '../request' -export function signalFromNodeRequest(request: IncomingMessage) { - const { errored } = request - if (errored) return AbortSignal.abort(errored) +/** + * Creates an AbortSignal tied to the closing of a ServerResponse (or other + * appropriate Writable). + * + * This cannot be done with the request (IncomingMessage or Readable) because + * the `abort` event will not fire if to data has been fully read (because that + * will "close" the readable stream and nothing fires after that). + */ +export function signalFromNodeResponse(response: Writable) { + const { errored, destroyed } = response + if (errored || destroyed) return AbortSignal.abort(errored) + const controller = new AbortController() - request.on('error', (e) => { - controller.abort(e) - }) + // If `finish` fires first, then `res.end()` has been called and the close is + // just us finishing the stream on our side. If `close` fires first, then we + // know the client disconnected before we finished. + function onClose() { + controller.abort() + // eslint-disable-next-line @typescript-eslint/no-use-before-define + response.off('finish', onFinish) + } + function onFinish() { + response.off('close', onClose) + } + response.once('close', onClose) + response.once('finish', onFinish) + return controller.signal } export class NextRequestAdapter { - public static fromBaseNextRequest(request: BaseNextRequest): NextRequest { + public static fromBaseNextRequest( + request: BaseNextRequest, + signal: AbortSignal + ): NextRequest { // TODO: look at refining this check if ('request' in request && (request as WebNextRequest).request) { return NextRequestAdapter.fromWebNextRequest(request as WebNextRequest) } - return NextRequestAdapter.fromNodeNextRequest(request as NodeNextRequest) + return NextRequestAdapter.fromNodeNextRequest( + request as NodeNextRequest, + signal + ) } - public static fromNodeNextRequest(request: NodeNextRequest): NextRequest { + public static fromNodeNextRequest( + request: NodeNextRequest, + signal: AbortSignal + ): NextRequest { // HEAD and GET requests can not have a body. let body: BodyInit | null = null if (request.method !== 'GET' && request.method !== 'HEAD' && request.body) { @@ -57,7 +86,7 @@ export class NextRequestAdapter { headers: fromNodeOutgoingHttpHeaders(request.headers), // @ts-expect-error - see https://github.com/whatwg/fetch/pull/1457 duplex: 'half', - signal: signalFromNodeRequest(request.originalRequest), + signal, // geo // ip // nextConfig diff --git a/test/e2e/cancel-request/app/edge-route/route.ts b/test/e2e/cancel-request/app/edge-route/route.ts new file mode 100644 index 000000000000000..5b18b691299dbd3 --- /dev/null +++ b/test/e2e/cancel-request/app/edge-route/route.ts @@ -0,0 +1,27 @@ +import { Streamable } from '../../streamable' + +export const runtime = 'edge' + +let streamable: ReturnType | undefined + +export async function GET(req: Request): Promise { + // Consume the entire request body. + // This is so we don't confuse the request close with the connection close. + await req.text() + + // The 2nd request should render the stats. We don't use a query param + // because edge rendering will create a different bundle for that. + if (streamable) { + const old = streamable + streamable = undefined + const i = await old.finished + return new Response(`${i}`) + } + + const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') + const s = (streamable = Streamable(+write!)) + req.signal.onabort = () => { + s.abort() + } + return new Response(s.stream) +} diff --git a/test/e2e/cancel-request/app/node-route/route.ts b/test/e2e/cancel-request/app/node-route/route.ts new file mode 100644 index 000000000000000..dfbe9352d6f5a39 --- /dev/null +++ b/test/e2e/cancel-request/app/node-route/route.ts @@ -0,0 +1,29 @@ +import { Streamable } from '../../streamable' + +export const runtime = 'nodejs' +// Next thinks it can statically compile this route, which breaks the test. +export const dynamic = 'force-dynamic' + +let streamable: ReturnType | undefined + +export async function GET(req: Request): Promise { + // Consume the entire request body. + // This is so we don't confuse the request close with the connection close. + await req.text() + + // The 2nd request should render the stats. We don't use a query param + // because edge rendering will create a different bundle for that. + if (streamable) { + const old = streamable + streamable = undefined + const i = await old.finished + return new Response(`${i}`) + } + + const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') + const s = (streamable = Streamable(+write!)) + req.signal.onabort = () => { + s.abort() + } + return new Response(s.stream) +} diff --git a/test/e2e/cancel-request/middleware.ts b/test/e2e/cancel-request/middleware.ts new file mode 100644 index 000000000000000..041d1587ea9b4a5 --- /dev/null +++ b/test/e2e/cancel-request/middleware.ts @@ -0,0 +1,29 @@ +import { Streamable } from './streamable' + +export const config = { + matcher: '/middleware', +} + +let streamable: ReturnType | undefined + +export default async function handler(req: Request): Promise { + // Consume the entire request body. + // This is so we don't confuse the request close with the connection close. + await req.text() + + // The 2nd request should render the stats. We don't use a query param + // because edge rendering will create a different bundle for that. + if (streamable) { + const old = streamable + streamable = undefined + const i = await old.finished + return new Response(`${i}`) + } + + const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') + const s = (streamable = Streamable(+write!)) + req.signal.onabort = () => { + s.abort() + } + return new Response(s.stream) +} diff --git a/test/e2e/cancel-request/pages/api/edge-api.ts b/test/e2e/cancel-request/pages/api/edge-api.ts new file mode 100644 index 000000000000000..8bebe9e1dec928a --- /dev/null +++ b/test/e2e/cancel-request/pages/api/edge-api.ts @@ -0,0 +1,29 @@ +import { Streamable } from '../../streamable' + +export const config = { + runtime: 'edge', +} + +let streamable: ReturnType | undefined + +export default async function handler(req: Request): Promise { + // Consume the entire request body. + // This is so we don't confuse the request close with the connection close. + await req.text() + + // The 2nd request should render the stats. We don't use a query param + // because edge rendering will create a different bundle for that. + if (streamable) { + const old = streamable + streamable = undefined + const i = await old.finished + return new Response(`${i}`) + } + + const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') + const s = (streamable = Streamable(+write!)) + req.signal.onabort = () => { + s.abort() + } + return new Response(s.stream) +} diff --git a/test/e2e/cancel-request/pages/api/node-api.ts b/test/e2e/cancel-request/pages/api/node-api.ts new file mode 100644 index 000000000000000..5163338396f9045 --- /dev/null +++ b/test/e2e/cancel-request/pages/api/node-api.ts @@ -0,0 +1,39 @@ +import { IncomingMessage, ServerResponse } from 'http' +import { pipeline } from 'stream' +import { Readable } from '../../readable' + +export const config = { + runtime: 'nodejs', +} + +let readable: ReturnType | undefined + +export default function handler( + req: IncomingMessage, + res: ServerResponse +): Promise { + // Pages API requests have already consumed the body. + // This is so we don't confuse the request close with the connection close. + + // The 2nd request should render the stats. We don't use a query param + // because edge rendering will create a different bundle for that. + if (readable) { + const old = readable + readable = undefined + return old.finished.then((i) => { + res.end(`${i}`) + }) + } + + const write = new URL(req.url!, 'http://localhost/').searchParams.get('write') + const r = (readable = Readable(+write!)) + res.on('close', () => { + r.abort() + }) + return new Promise((resolve) => { + pipeline(r.stream, res, () => { + resolve() + res.end() + }) + }) +} diff --git a/test/e2e/cancel-request/pages/index.js b/test/e2e/cancel-request/pages/index.js new file mode 100644 index 000000000000000..57ccaeb8f3b9dcf --- /dev/null +++ b/test/e2e/cancel-request/pages/index.js @@ -0,0 +1,3 @@ +export default function Home() { + return 'index' +} diff --git a/test/e2e/cancel-request/readable.ts b/test/e2e/cancel-request/readable.ts new file mode 100644 index 000000000000000..91fe237cd8ecaac --- /dev/null +++ b/test/e2e/cancel-request/readable.ts @@ -0,0 +1,31 @@ +import * as stream from 'stream' +import { Deferred, sleep } from './sleep' + +export function Readable(write: number) { + const encoder = new TextEncoder() + const cleanedUp = new Deferred() + const aborted = new Deferred() + let i = 0 + + const readable = { + finished: Promise.all([cleanedUp.promise, aborted.promise]).then(() => i), + + abort() { + aborted.resolve() + }, + stream: new stream.Readable({ + async read() { + if (i >= write) { + return + } + + await sleep(100) + this.push(encoder.encode(String(i++))) + }, + destroy() { + cleanedUp.resolve() + }, + }), + } + return readable +} diff --git a/test/e2e/cancel-request/sleep.ts b/test/e2e/cancel-request/sleep.ts new file mode 100644 index 000000000000000..0bc90d311a3157c --- /dev/null +++ b/test/e2e/cancel-request/sleep.ts @@ -0,0 +1,16 @@ +export function sleep(ms: number) { + return new Promise((res) => setTimeout(res, ms)) +} + +export class Deferred { + declare promise: Promise + declare resolve: (v?: T | PromiseLike) => void + declare reject: (r?: any) => void + + constructor() { + this.promise = new Promise((res, rej) => { + this.resolve = res + this.reject = rej + }) + } +} diff --git a/test/e2e/cancel-request/stream-cancel.test.ts b/test/e2e/cancel-request/stream-cancel.test.ts new file mode 100644 index 000000000000000..d157a8c5651ac59 --- /dev/null +++ b/test/e2e/cancel-request/stream-cancel.test.ts @@ -0,0 +1,89 @@ +import { createNextDescribe } from 'e2e-utils' +import { sleep } from './sleep' +import { get } from 'http' + +createNextDescribe( + 'streaming responses cancel inner stream after disconnect', + { + files: __dirname, + }, + ({ next }) => { + // For some reason, it's flakey. Try a few times. + jest.retryTimes(3) + + function prime(url: string, noData?: boolean) { + return new Promise((resolve) => { + url = new URL(url, next.url).href + + // There's a bug in node-fetch v2 where aborting the fetch will never abort + // the connection, because the body is a transformed stream that doesn't + // close the connection stream. + // https://github.com/node-fetch/node-fetch/pull/670 + const req = get(url, async (res) => { + while (true) { + const value = res.read(1) + if (value) break + await sleep(5) + } + + res.destroy() + resolve() + }) + req.end() + + if (noData) { + req.on('error', (e) => { + // Swallow the "socket hang up" message that happens if you abort + // before the a response connection is received. + if ((e as any).code !== 'ECONNRESET') { + throw e + } + }) + + setTimeout(() => { + req.abort() + resolve() + }, 100) + } + }) + } + + describe.each([ + ['middleware', '/middleware'], + ['edge app route handler', '/edge-route'], + ['node app route handler', '/node-route'], + ['edge pages api', '/api/edge-api'], + ['node pages api', '/api/node-api'], + ])('%s', (_name, path) => { + it('cancels stream making progress', async () => { + // If the stream is making regular progress, then we'll eventually hit + // the break because `res.destroyed` is true. + const url = path + '?write=25' + await prime(url) + const res = await next.fetch(url) + const i = +(await res.text()) + expect(i).toBeWithin(1, 5) + }, 2500) + + it('cancels stalled stream', async () => { + // If the stream is stalled, we'll never hit the `res.destroyed` break + // point, so this ensures we handle it with an out-of-band cancellation. + const url = path + '?write=1' + await prime(url) + const res = await next.fetch(url) + const i = +(await res.text()) + expect(i).toBe(1) + }, 2500) + + it('cancels stream that never sent data', async () => { + // If the client has never sent any data (including headers), then we + // haven't even established the response object yet. + const url = path + '?write=0' + await prime(url, true) + const res = await next.fetch(url) + const i = +(await res.text()) + expect(i).toBe(0) + }, 2500) + }) + } +) diff --git a/test/e2e/cancel-request/streamable.ts b/test/e2e/cancel-request/streamable.ts new file mode 100644 index 000000000000000..73edfd49036fbb0 --- /dev/null +++ b/test/e2e/cancel-request/streamable.ts @@ -0,0 +1,30 @@ +import { Deferred, sleep } from './sleep' + +export function Streamable(write: number) { + const encoder = new TextEncoder() + const cleanedUp = new Deferred() + const aborted = new Deferred() + let i = 0 + + const streamable = { + finished: Promise.all([cleanedUp.promise, aborted.promise]).then(() => i), + + abort() { + aborted.resolve() + }, + stream: new ReadableStream({ + async pull(controller) { + if (i >= write) { + return + } + + await sleep(100) + controller.enqueue(encoder.encode(String(i++))) + }, + cancel() { + cleanedUp.resolve() + }, + }), + } + return streamable +} diff --git a/test/integration/edge-runtime-streaming-error/test/index.test.ts b/test/integration/edge-runtime-streaming-error/test/index.test.ts index bb76cd7f8834cd7..21c41676ecb867b 100644 --- a/test/integration/edge-runtime-streaming-error/test/index.test.ts +++ b/test/integration/edge-runtime-streaming-error/test/index.test.ts @@ -22,7 +22,7 @@ function test(context: ReturnType) { await waitFor(200) await check( () => stripAnsi(context.output), - new RegExp(`This ReadableStream did not return bytes.`, 'm') + new RegExp(`The first argument must be of type string`, 'm') ) expect(stripAnsi(context.output)).not.toContain('webpack-internal:') }