Skip to content

Commit

Permalink
Cleanup and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jridgewell committed Jul 22, 2023
1 parent 12011fc commit 65ccecb
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { createManifests, installRequireAndChunkLoad } from './manifest'
import type { NextRequest, NextFetchEvent } from 'next/server'
import type { RenderOpts } from 'next/dist/server/app-render/types'
import type { ParsedUrlQuery } from 'querystring'
import EventEmitter from 'events'

installRequireAndChunkLoad()

Expand Down Expand Up @@ -85,18 +86,15 @@ async function render(request: NextRequest, event: NextFetchEvent) {
const target = {
write: (chunk: Uint8Array) => writer.write(chunk),
end: () => writer.close(),
destroy: (reason?: Error) => writer.abort(reason),

on(_event: 'close', cb: () => void) {
innerClose = cb
},
off(_event: 'close', _cb: () => void) {
innerClose = undefined
},
destroyed: false,
}
const onClose = () => {
target.destroyed = true
innerClose?.()
}
// No, this cannot be replaced with `finally`, because early cancelling
Expand Down
6 changes: 5 additions & 1 deletion packages/next/src/server/lib/router-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ export async function initialize(opts: {
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
} catch (e) {
// If the client aborts before we can receive a response object (when
// the headers are flushed), then we can early exit without further
// processing.
if (isAbortError(e)) {
return
}
Expand Down Expand Up @@ -706,7 +709,8 @@ export async function initialize(opts: {
const { matchedOutput, parsedUrl } = await resolveRoutes(
req,
new Set(),
true
true,
signalFromNodeResponse(socket)
)

// TODO: allow upgrade requests to pages/app paths?
Expand Down
3 changes: 3 additions & 0 deletions packages/next/src/server/lib/router-utils/resolve-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ export function getResolveRoutes(
getRequestMeta(req, '__NEXT_CLONABLE_BODY')?.cloneBodyStream()
)
} catch (e) {
// If the client aborts before we can receive a response object
// (when the headers are flushed), then we can early exit without
// further processing.
if (isAbortError(e)) {
return {
parsedUrl,
Expand Down
63 changes: 51 additions & 12 deletions packages/next/src/server/lib/server-ipc/invoke-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,40 @@ export const invokeRequest = async (
})
}

/**
* This is a minimal implementation of a Writable with just enough
* functionality to handle stream cancellation.
*/
export interface PipeTarget {
/**
* Called when new data is read from readable source.
*/
write: (chunk: Uint8Array) => unknown

/**
* Always called once we read all data (if the writable isn't already
* destroyed by a client disconnect).
*/
end: () => unknown

/**
* An optional method which is called after every write, to support
* immediately streaming in gzip responses.
*/
flush?: () => unknown
destroy: (err?: Error) => unknown

// These are necessary for us to detect client disconnect and cancel streaming.
/**
* The close event listener is necessary for us to detect an early client
* disconnect while we're attempting to read data. This must be done
* out-of-band so that we can cancel the readable (else we'd have to wait for
* the readable to produce more data before we could tell it to cancel).
*/
on: (event: 'close', cb: () => void) => void

/**
* Allows us to cleanup our onClose listener.
*/
off: (event: 'close', cb: () => void) => void
get destroyed(): boolean
}

export async function pipeReadable(
Expand All @@ -63,23 +87,34 @@ export async function pipeReadable(
) {
const reader = readable.getReader()
let readerDone = false
let writableClosed = false

// It's not enough just to check for `writable.destroyed`, because the client
// may disconnect while we're waiting for a read. We need to immediately
// cancel the readable, and that requires an out-of-band listener.
function onClose() {
writableClosed = true
writable.off?.('close', onClose)

// If the reader is not yet done, we need to cancel it so that the stream
// source's resources can be cleaned up. If a read is in-progress, this
// will also ensure the read promise rejects and frees our resources.
if (!readerDone) {
readerDone = true
reader.cancel().catch(() => {})
}
}
writable.on?.('close', onClose)

const id = String(Math.random()).slice(2, 5)
try {
while (true) {
// If the read throws, then the reader is done. If not, then we'll set
// readerDone to the actual done value after the read.
readerDone = true
const { done, value } = await reader.read()

readerDone = done
if (done || writable.destroyed) {

if (done || writableClosed) {
break
}

Expand All @@ -89,20 +124,24 @@ export async function pipeReadable(
}
}
} catch (e) {
// Only the reader will throw an error, and if it does then we know that it is done.
readerDone = true
// If the client disconnects, we don't want to emit an unhandled error.
if (!isAbortError(e)) {
throw e
}
} finally {
if (!writable.destroyed) {
writable.end()
}
writable.off?.('close', onClose)

// If we broke out of the loop because of a client disconnect, and the
// close event hasn't yet fired, we can early cancel.
if (!readerDone) {
reader.cancel().catch(() => {})
}
writable.off?.('close', onClose)

// If the client hasn't disconnected yet, end the writable so that the
// response sends the final bytes.
if (!writableClosed) {
writable.end()
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions packages/next/src/server/lib/start-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ export async function startServer({
getCloneableBody(req).cloneBodyStream()
)
} catch (e) {
// If the client aborts before we can receive a response object (when
// the headers are flushed), then we can early exit without further
// processing.
if (isAbortError(e)) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ export const streamToBufferedResult = async (
renderChunks.push(decodeText(chunk, textDecoder))
},
end() {},
destroy() {},

// We do not support stream cancellation
destroyed: false,
on() {},
off() {},
}
Expand Down
3 changes: 0 additions & 3 deletions packages/next/src/server/web-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -379,18 +379,15 @@ export default class NextWebServer extends BaseServer<WebServerOptions> {
const target = {
write: (chunk: Uint8Array) => writer.write(chunk),
end: () => writer.close(),
destroy: (err?: Error) => writer.abort(err),

on(_event: 'close', cb: () => void) {
innerClose = cb
},
off(_event: 'close', _cb: () => void) {
innerClose = undefined
},
destroyed: false,
}
const onClose = () => {
target.destroyed = true
innerClose?.()
}
// No, this cannot be replaced with `finally`, because early cancelling
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import type { BaseNextRequest } from '../../../base-http'
import type { NodeNextRequest } from '../../../base-http/node'
import type { WebNextRequest } from '../../../base-http/web'
import type { ServerResponse } from 'node:http'
import type { Writable } from 'node:stream'

import { getRequestMeta } from '../../../request-meta'
import { fromNodeOutgoingHttpHeaders } from '../../utils'
import { NextRequest } from '../request'

export function signalFromNodeResponse(response: ServerResponse) {
/**
* Creates an AbortSignal tied to the closing of a ServerResponse (or other
* appropriate Writable).
*
* This cannot be done with the request (IncomingMessage or Readable) because
* the `abort` event will not fire if to data has been fully read (because that
* will "close" the readable stream and nothing fires after that).
*/
export function signalFromNodeResponse(response: Writable) {
const { errored, destroyed } = response
if (errored || destroyed) return AbortSignal.abort(errored)

Expand Down

0 comments on commit 65ccecb

Please sign in to comment.