Skip to content

Commit

Permalink
Reimplement stream cancellation (vercel#52281)
Browse files Browse the repository at this point in the history
### What?

This reimplements our stream cancellation code for a few more cases:
1. Adds support in all stream-returning APIs
2. Fixes cancellation detection in node 16
3. Implements out-of-band detection, so can cancel in the middle of a
read

It also (finally) adds tests for all the cases I'm aware of.

### Why?

To allow disconnecting from an AI service when a client disconnects. $$$

### How?

1. Reuses a single pipe function in all paths to push data from the
dev's `ReadableStream` into our `ServerResponse`
2. Uses `ServerResponse` to detect disconnect, instead of the
`IncomingMessage` (request)
    - The `close` event fire once all incoming body data is read
- The request `abort` event will not fire after the incoming body data
has been fully read
3. Using `on('close')` on the writable destination allows us to detect
close
- Checking for `res.destroyed` in the body of the loop meant we had to
wait for the `await stream.read()` to complete before we could possibly
cancel the stream

- - -

vercel#52157 (and vercel#51594) had an issue with Node 16, because I was using
`res.closed` to detect when the server response was closed by the client
disconnecting. But, `closed` wasn't
[added](nodejs/node#45672) until
[v18.13.0](https://nodejs.org/en/blog/release/v18.13.0#:~:text=%5Bcbd710bbf4%5D%20%2D%20http%3A%20make%20OutgoingMessage%20more%20streamlike%20(Robert%20Nagy)%20%2345672).
This fixes it by using `res.destroyed`.

Reverts vercel#52277
Relands vercel#52157
Fixes vercel#52809

---------
  • Loading branch information
jridgewell authored and Strift committed Jul 27, 2023
1 parent d98709c commit 0117968
Show file tree
Hide file tree
Showing 29 changed files with 667 additions and 176 deletions.
Expand Up @@ -80,11 +80,27 @@ async function render(request: NextRequest, event: NextFetchEvent) {
response.headers.append('Vary', RSC_VARY_HEADER)

const writer = tranform.writable.getWriter()
result.pipe({

let innerClose: undefined | (() => void)
const target = {
write: (chunk: Uint8Array) => writer.write(chunk),
end: () => writer.close(),
destroy: (reason?: Error) => writer.abort(reason),
})

on(_event: 'close', cb: () => void) {
innerClose = cb
},
off(_event: 'close', _cb: () => void) {
innerClose = undefined
},
}
const onClose = () => {
innerClose?.()
}
// No, this cannot be replaced with `finally`, because early cancelling
// the stream will create a rejected promise, and finally will create an
// unhandled rejection.
writer.closed.then(onClose, onClose)
result.pipe(target)

return response
}
Expand Down
Expand Up @@ -11,7 +11,10 @@ import {
NodeNextResponse,
} from 'next/dist/server/base-http/node'
import { sendResponse } from 'next/dist/server/send-response'
import { NextRequestAdapter } from 'next/dist/server/web/spec-extension/adapters/next-request'
import {
NextRequestAdapter,
signalFromNodeResponse,
} from 'next/dist/server/web/spec-extension/adapters/next-request'
import { RouteHandlerManagerContext } from 'next/dist/server/future/route-handler-managers/route-handler-manager'

import { attachRequestMeta } from './next-request-helpers'
Expand Down Expand Up @@ -43,7 +46,10 @@ export default (routeModule: RouteModule) => {
}

const routeResponse = await routeModule.handle(
NextRequestAdapter.fromNodeNextRequest(req),
NextRequestAdapter.fromNodeNextRequest(
req,
signalFromNodeResponse(response)
),
context
)

Expand Down
8 changes: 6 additions & 2 deletions packages/next/src/export/worker.ts
Expand Up @@ -45,7 +45,10 @@ import { NodeNextRequest } from '../server/base-http/node'
import { isAppRouteRoute } from '../lib/is-app-route-route'
import { toNodeOutgoingHttpHeaders } from '../server/web/utils'
import { RouteModuleLoader } from '../server/future/helpers/module-loader/route-module-loader'
import { NextRequestAdapter } from '../server/web/spec-extension/adapters/next-request'
import {
NextRequestAdapter,
signalFromNodeResponse,
} from '../server/web/spec-extension/adapters/next-request'
import * as ciEnvironment from '../telemetry/ci-info'

const envConfig = require('../shared/lib/runtime-config')
Expand Down Expand Up @@ -388,7 +391,8 @@ export default async function exportPage({
// Ensure that the url for the page is absolute.
req.url = `http://localhost:3000${req.url}`
const request = NextRequestAdapter.fromNodeNextRequest(
new NodeNextRequest(req)
new NodeNextRequest(req),
signalFromNodeResponse(res)
)

// Create the context for the handler. This contains the params from
Expand Down
8 changes: 7 additions & 1 deletion packages/next/src/server/base-server.ts
Expand Up @@ -116,6 +116,7 @@ import {
type RouteMatch,
} from './future/route-matches/route-match'
import { normalizeLocalePath } from '../shared/lib/i18n/normalize-locale-path'
import { signalFromNodeResponse } from './web/spec-extension/adapters/next-request'

export type FindComponentsResult = {
components: LoadComponentsReturnType
Expand Down Expand Up @@ -1837,7 +1838,12 @@ export default abstract class Server<ServerOptions extends Options = Options> {

try {
// Handle the match and collect the response if it's a static response.
const response = await this.handlers.handle(match, req, context)
const response = await this.handlers.handle(
match,
req,
context,
signalFromNodeResponse((res as NodeNextResponse).originalResponse)
)

;(req as any).fetchMetrics = (
context.staticGenerationContext as any
Expand Down
Expand Up @@ -24,7 +24,8 @@ export class RouteHandlerManager {
public async handle(
match: AppRouteRouteMatch,
req: BaseNextRequest,
context: RouteHandlerManagerContext
context: RouteHandlerManagerContext,
signal: AbortSignal
): Promise<Response> {
// The module supports minimal mode, load the minimal module.
const module = await RouteModuleLoader.load<RouteModule>(
Expand All @@ -33,7 +34,7 @@ export class RouteHandlerManager {
)

// Convert the BaseNextRequest to a NextRequest.
const request = NextRequestAdapter.fromBaseNextRequest(req)
const request = NextRequestAdapter.fromBaseNextRequest(req, signal)

// Get the response from the handler and send it back.
return await module.handle(request, context)
Expand Down
21 changes: 13 additions & 8 deletions packages/next/src/server/lib/route-resolver.ts
Expand Up @@ -17,9 +17,9 @@ import { proxyRequest } from './router-utils/proxy-request'
import { getResolveRoutes } from './router-utils/resolve-routes'
import { PERMANENT_REDIRECT_STATUS } from '../../shared/lib/constants'
import { splitCookiesString, toNodeOutgoingHttpHeaders } from '../web/utils'
import { signalFromNodeRequest } from '../web/spec-extension/adapters/next-request'
import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request'
import { getMiddlewareRouteMatcher } from '../../shared/lib/router/utils/middleware-route-matcher'
import { pipeReadable } from './server-ipc/invoke-request'
import { pipeReadable } from '../pipe-readable'

type RouteResult =
| {
Expand Down Expand Up @@ -132,7 +132,7 @@ export async function makeResolver(
serverAddr.port || 3000
}${req.url}`,
body: cloneableBody,
signal: signalFromNodeRequest(req),
signal: signalFromNodeResponse(res),
},
useCache: true,
onWarning: console.warn,
Expand Down Expand Up @@ -160,11 +160,11 @@ export async function makeResolver(
}
res.statusCode = result.response.status

for await (const chunk of result.response.body || ([] as any)) {
if (res.closed) break
res.write(chunk)
if (result.response.body) {
await pipeReadable(result.response.body, res)
} else {
res.end()
}
res.end()
} catch (err) {
console.error(err)
res.statusCode = 500
Expand Down Expand Up @@ -218,7 +218,12 @@ export async function makeResolver(
req: IncomingMessage,
res: ServerResponse
): Promise<RouteResult | void> {
const routeResult = await resolveRoutes(req, new Set(), false)
const routeResult = await resolveRoutes(
req,
new Set(),
false,
signalFromNodeResponse(res)
)
const {
matchedOutput,
bodyStream,
Expand Down
42 changes: 31 additions & 11 deletions packages/next/src/server/lib/router-server.ts
Expand Up @@ -15,7 +15,8 @@ import { filterReqHeaders } from './server-ipc/utils'
import { findPagesDir } from '../../lib/find-pages-dir'
import { setupFsCheck } from './router-utils/filesystem'
import { proxyRequest } from './router-utils/proxy-request'
import { invokeRequest, pipeReadable } from './server-ipc/invoke-request'
import { invokeRequest } from './server-ipc/invoke-request'
import { isAbortError, pipeReadable } from '../pipe-readable'
import { createRequestResponseMocks } from './mock-request'
import { createIpcServer, createWorker } from './server-ipc'
import { UnwrapPromise } from '../../lib/coalesced-function'
Expand All @@ -29,6 +30,7 @@ import {
PHASE_DEVELOPMENT_SERVER,
PERMANENT_REDIRECT_STATUS,
} from '../../shared/lib/constants'
import { signalFromNodeResponse } from '../web/spec-extension/adapters/next-request'

let initializeResult:
| undefined
Expand Down Expand Up @@ -331,14 +333,26 @@ export async function initialize(opts: {

debug('invokeRender', renderUrl, invokeHeaders)

const invokeRes = await invokeRequest(
renderUrl,
{
headers: invokeHeaders,
method: req.method,
},
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
let invokeRes
try {
invokeRes = await invokeRequest(
renderUrl,
{
headers: invokeHeaders,
method: req.method,
signal: signalFromNodeResponse(res),
},
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
} catch (e) {
// If the client aborts before we can receive a response object (when
// the headers are flushed), then we can early exit without further
// processing.
if (isAbortError(e)) {
return
}
throw e
}

debug('invokeRender res', invokeRes.status, invokeRes.headers)

Expand Down Expand Up @@ -419,7 +433,12 @@ export async function initialize(opts: {
resHeaders,
bodyStream,
matchedOutput,
} = await resolveRoutes(req, matchedDynamicRoutes, false)
} = await resolveRoutes(
req,
matchedDynamicRoutes,
false,
signalFromNodeResponse(res)
)

if (devInstance && matchedOutput?.type === 'devVirtualFsItem') {
const origUrl = req.url || '/'
Expand Down Expand Up @@ -687,7 +706,8 @@ export async function initialize(opts: {
const { matchedOutput, parsedUrl } = await resolveRoutes(
req,
new Set(),
true
true,
signalFromNodeResponse(socket)
)

// TODO: allow upgrade requests to pages/app paths?
Expand Down
20 changes: 19 additions & 1 deletion packages/next/src/server/lib/router-utils/proxy-request.ts
Expand Up @@ -34,6 +34,24 @@ export async function proxyRequest(
await new Promise((proxyResolve, proxyReject) => {
let finished = false

// http-proxy does not properly detect a client disconnect in newer
// versions of Node.js. This is caused because it only listens for the
// `aborted` event on the our request object, but it also fully reads
// and closes the request object. Node **will not** fire `aborted` when
// the request is already closed. Listening for `close` on our response
// object will detect the disconnect, and we can abort the proxy's
// connection.
proxy.on('proxyReq', (proxyReq) => {
res.on('close', () => proxyReq.destroy())
})
proxy.on('proxyRes', (proxyRes) => {
if (res.destroyed) {
proxyRes.destroy()
} else {
res.on('close', () => proxyRes.destroy())
}
})

proxy.on('proxyRes', (proxyRes, innerReq, innerRes) => {
const cleanup = (err: any) => {
// cleanup event listeners to allow clean garbage collection
Expand All @@ -59,7 +77,7 @@ export async function proxyRequest(
finished = true
proxyReject(err)

if (!res.closed) {
if (!res.destroyed) {
res.statusCode = 500
res.end('Internal Server Error')
}
Expand Down
37 changes: 28 additions & 9 deletions packages/next/src/server/lib/router-utils/resolve-routes.ts
Expand Up @@ -13,6 +13,7 @@ import { Header } from '../../../lib/load-custom-routes'
import { stringifyQuery } from '../../server-route-utils'
import { toNodeOutgoingHttpHeaders } from '../../web/utils'
import { invokeRequest } from '../server-ipc/invoke-request'
import { isAbortError } from '../../pipe-readable'
import { getCookieParser, setLazyProp } from '../../api-utils'
import { getHostname } from '../../../shared/lib/get-hostname'
import { UnwrapPromise } from '../../../lib/coalesced-function'
Expand Down Expand Up @@ -93,7 +94,8 @@ export function getResolveRoutes(
async function resolveRoutes(
req: IncomingMessage,
matchedDynamicRoutes: Set<string>,
isUpgradeReq?: boolean
isUpgradeReq: boolean,
signal: AbortSignal
): Promise<{
finished: boolean
statusCode?: number
Expand Down Expand Up @@ -453,14 +455,31 @@ export function getResolveRoutes(

debug('invoking middleware', renderUrl, invokeHeaders)

const middlewareRes = await invokeRequest(
renderUrl,
{
headers: invokeHeaders,
method: req.method,
},
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
let middlewareRes
try {
middlewareRes = await invokeRequest(
renderUrl,
{
headers: invokeHeaders,
method: req.method,
signal,
},
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
} catch (e) {
// If the client aborts before we can receive a response object
// (when the headers are flushed), then we can early exit without
// further processing.
if (isAbortError(e)) {
return {
parsedUrl,
resHeaders,
finished: true,
}
}
throw e
}

const middlewareHeaders = toNodeOutgoingHttpHeaders(
middlewareRes.headers
) as Record<string, string | string[] | undefined>
Expand Down
33 changes: 4 additions & 29 deletions packages/next/src/server/lib/server-ipc/invoke-request.ts
@@ -1,14 +1,15 @@
import '../../node-polyfill-fetch'

import type { IncomingMessage } from 'http'
import type { Writable, Readable } from 'stream'
import type { Readable } from 'stream'
import { filterReqHeaders } from './utils'

export const invokeRequest = async (
targetUrl: string,
requestInit: {
headers: IncomingMessage['headers']
method: IncomingMessage['method']
signal?: AbortSignal
},
readableBody?: Readable | ReadableStream
) => {
Expand All @@ -22,10 +23,11 @@ export const invokeRequest = async (
...requestInit.headers,
}) as IncomingMessage['headers']

const invokeRes = await fetch(parsedTargetUrl.toString(), {
return await fetch(parsedTargetUrl.toString(), {
headers: invokeHeaders as any as Headers,
method: requestInit.method,
redirect: 'manual',
signal: requestInit.signal,

...(requestInit.method !== 'GET' &&
requestInit.method !== 'HEAD' &&
Expand All @@ -41,31 +43,4 @@ export const invokeRequest = async (
internal: true,
},
})

return invokeRes
}

export async function pipeReadable(
readable: ReadableStream,
writable: Writable
) {
const reader = readable.getReader()

async function doRead() {
const item = await reader.read()

if (item?.value) {
writable.write(Buffer.from(item?.value))

if ('flush' in writable) {
;(writable as any).flush()
}
}

if (!item?.done) {
return doRead()
}
}
await doRead()
writable.end()
}

0 comments on commit 0117968

Please sign in to comment.