Skip to content

Commit

Permalink
Add response stream errorhandling in edge-function-runtime (#41102)
Browse files Browse the repository at this point in the history
The behaviour of edge-function-runtime in the case of an error was not
identical to the edge-runtime.
If a type other than "Uint8Array" is written to the Response stream a
unhandledreject is raised and logged.
The current implementations(nodejs) accepts also Buffers and Strings
which causes that a Application
Developer things our stream implementation is broken if it is executed
as worker.
We introduced a helper function to consume the response stream and write
the "Uint8Array" stream chunks
to the server implementation. Due to the complication that the error
side effect is emitted via the unhandledrejection
handler it is almost impossible to test --- jest does not allow testing
of the unhandlerejections.
We tested extendsiveliy the helper in the edge-runtime so that this PR
integrates just the consuming function.

## Bug

- [ ] Related issues linked using `fixes #number`
- [ ] Integration tests added
- [ ] Errors have a helpful link attached, see `contributing.md`

## Feature

- [ ] Implements an existing feature request or RFC. Make sure the
feature request has been accepted for implementation before opening a
PR.
- [ ] Related issues linked using `fixes #number`
- [ ] Integration tests added
- [ ] Documentation added
- [ ] Telemetry added. In case of a feature if it's used or not.
- [ ] Errors have a helpful link attached, see `contributing.md`

## Documentation / Examples

- [ ] Make sure the linting passes by running `pnpm lint`
- [ ] The "examples guidelines" are followed from [our contributing
doc](https://github.com/vercel/next.js/blob/canary/contributing/examples/adding-examples.md)

Co-authored-by: JJ Kasper <jj@jjsweb.site>
  • Loading branch information
mabels and ijjk committed Oct 6, 2022
1 parent 24b6003 commit 27fe5c8
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 30 deletions.
17 changes: 0 additions & 17 deletions packages/next/server/body-streams.ts
Expand Up @@ -17,23 +17,6 @@ export function requestToBodyStream(
})
}

export function bodyStreamToNodeStream(
bodyStream: ReadableStream<Uint8Array>
): Readable {
const reader = bodyStream.getReader()
return Readable.from(
(async function* () {
while (true) {
const { done, value } = await reader.read()
if (done) {
return
}
yield value
}
})()
)
}

function replaceRequestBody<T extends IncomingMessage>(
base: T,
stream: Readable
Expand Down
20 changes: 11 additions & 9 deletions packages/next/server/dev/next-dev-server.ts
Expand Up @@ -978,16 +978,18 @@ export default class DevServer extends Server {
try {
return await super.run(req, res, parsedUrl)
} catch (error) {
res.statusCode = 500
const err = getProperError(error)
try {
this.logErrorWithOriginalStack(err).catch(() => {})
return await this.renderError(err, req, res, pathname!, {
__NEXT_PAGE: (isError(err) && err.page) || pathname || '',
})
} catch (internalErr) {
console.error(internalErr)
res.body('Internal Server Error').send()
this.logErrorWithOriginalStack(err).catch(() => {})
if (!res.sent) {
res.statusCode = 500
try {
return await this.renderError(err, req, res, pathname!, {
__NEXT_PAGE: (isError(err) && err.page) || pathname || '',
})
} catch (internalErr) {
console.error(internalErr)
res.body('Internal Server Error').send()
}
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions packages/next/server/next-server.ts
Expand Up @@ -68,6 +68,7 @@ import { ParsedUrl, parseUrl } from '../shared/lib/router/utils/parse-url'
import { parse as nodeParseUrl } from 'url'
import * as Log from '../build/output/log'
import loadRequireHook from '../build/webpack/require-hook'
import { consumeUint8ArrayReadableStream } from 'next/dist/compiled/edge-runtime'

import BaseServer, {
Options,
Expand Down Expand Up @@ -95,7 +96,7 @@ import { getCustomRoute, stringifyQuery } from './server-route-utils'
import { urlQueryToSearchParams } from '../shared/lib/router/utils/querystring'
import { removeTrailingSlash } from '../shared/lib/router/utils/remove-trailing-slash'
import { getNextPathnameInfo } from '../shared/lib/router/utils/get-next-pathname-info'
import { bodyStreamToNodeStream, getClonableBody } from './body-streams'
import { getClonableBody } from './body-streams'
import { checkIsManualRevalidate } from './api-utils'
import { shouldUseReactRoot, isTargetLikeServerless } from './utils'
import ResponseCache from './response-cache'
Expand Down Expand Up @@ -2122,9 +2123,16 @@ export default class NextNodeServer extends BaseServer {

if (result.response.body) {
// TODO(gal): not sure that we always need to stream
bodyStreamToNodeStream(result.response.body).pipe(
(params.res as NodeNextResponse).originalResponse
)
const nodeResStream = (params.res as NodeNextResponse).originalResponse
try {
for await (const chunk of consumeUint8ArrayReadableStream(
result.response.body
)) {
nodeResStream.write(chunk)
}
} finally {
nodeResStream.end()
}
} else {
;(params.res as NodeNextResponse).originalResponse.end()
}
Expand Down
14 changes: 14 additions & 0 deletions test/integration/edge-runtime-streaming-error/pages/api/test.js
@@ -0,0 +1,14 @@
export const config = {
runtime: 'experimental-edge',
}
export default function () {
return new Response(
new ReadableStream({
start(ctr) {
ctr.enqueue(new TextEncoder().encode('hello'))
ctr.enqueue(true)
ctr.close()
},
})
)
}
81 changes: 81 additions & 0 deletions test/integration/edge-runtime-streaming-error/test/index.test.ts
@@ -0,0 +1,81 @@
import stripAnsi from 'next/dist/compiled/strip-ansi'
import {
fetchViaHTTP,
findPort,
killApp,
launchApp,
nextBuild,
nextStart,
waitFor,
} from 'next-test-utils'
import path from 'path'
import { remove } from 'fs-extra'

const appDir = path.join(__dirname, '..')

function test(context: ReturnType<typeof createContext>) {
return async () => {
const res = await fetchViaHTTP(context.appPort, '/api/test')
expect(await res.text()).toEqual('hello')
expect(res.status).toBe(200)
await waitFor(200)
const santizedOutput = stripAnsi(context.output)
expect(santizedOutput).toMatch(
new RegExp(`TypeError: This ReadableStream did not return bytes.`, 'm')
)
expect(santizedOutput).not.toContain('webpack-internal:')
}
}

function createContext() {
const ctx = {
output: '',
appPort: -1,
app: undefined,
handler: {
onStdout(msg) {
this.output += msg
},
onStderr(msg) {
this.output += msg
},
},
}
ctx.handler.onStderr = ctx.handler.onStderr.bind(ctx)
ctx.handler.onStdout = ctx.handler.onStdout.bind(ctx)
return ctx
}

describe('dev mode', () => {
const context = createContext()

beforeAll(async () => {
context.appPort = await findPort()
context.app = await launchApp(appDir, context.appPort, {
...context.handler,
env: { __NEXT_TEST_WITH_DEVTOOL: 1 },
})
})

afterAll(() => killApp(context.app))

it('logs the error correctly', test(context))
})

describe('production mode', () => {
const context = createContext()

beforeAll(async () => {
await remove(path.join(appDir, '.next'))
await nextBuild(appDir, undefined, {
stderr: true,
stdout: true,
})
context.appPort = await findPort()
context.app = await nextStart(appDir, context.appPort, {
...context.handler,
})
})
afterAll(() => killApp(context.app))
it('logs the error correctly', test(context))
})

0 comments on commit 27fe5c8

Please sign in to comment.