Skip to content

Commit

Permalink
Handle rewriting WebSocket requests (#39463)
Browse files Browse the repository at this point in the history
This ensures we properly handle rewrites when the request is a WebSocket request. This also adds an integration test to ensure it is working as expected in dev and production mode. 

## Bug

- [x] Related issues linked using `fixes #number`
- [x] Integration tests added
- [ ] Errors have helpful link attached, see `contributing.md`

Fixes: #32634
Closes: #38455
  • Loading branch information
ijjk committed Aug 10, 2022
1 parent d64a150 commit b15a976
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 21 deletions.
10 changes: 10 additions & 0 deletions packages/next/server/base-server.ts
Expand Up @@ -119,6 +119,10 @@ export interface Options {
* The port the server is running behind
*/
port?: number
/**
* The HTTP Server that Next.js is running behind
*/
httpServer?: import('http').Server
}

export interface BaseRequestHandler {
Expand Down Expand Up @@ -665,6 +669,12 @@ export default abstract class Server<ServerOptions extends Options = Options> {
return this.handleRequest.bind(this)
}

protected async handleUpgrade(
_req: BaseNextRequest,
_socket: any,
_head?: any
): Promise<void> {}

public setAssetPrefix(prefix?: string): void {
this.renderOpts.assetPrefix = prefix ? prefix.replace(/\/$/, '') : ''
}
Expand Down
6 changes: 2 additions & 4 deletions packages/next/server/dev/next-dev-server.ts
Expand Up @@ -83,10 +83,6 @@ const ReactDevOverlay = (props: any) => {
}

export interface Options extends ServerOptions {
/**
* The HTTP Server that Next.js is running behind
*/
httpServer?: HTTPServer
/**
* Tells of Next.js is running from the `next dev` command
*/
Expand Down Expand Up @@ -629,6 +625,8 @@ export default class DevServer extends Server {
)
) {
this.hotReloader?.onHMR(req, socket, head)
} else {
this.handleUpgrade(req, socket, head)
}
})
}
Expand Down
9 changes: 9 additions & 0 deletions packages/next/server/lib/start-server.ts
Expand Up @@ -39,6 +39,14 @@ export function startServer(opts: StartServerOptions) {
}
})

let upgradeHandler: any

if (!opts.dev) {
server.on('upgrade', (req, socket, upgrade) => {
upgradeHandler(req, socket, upgrade)
})
}

server.on('listening', () => {
const addr = server.address()
const hostname =
Expand All @@ -55,6 +63,7 @@ export function startServer(opts: StartServerOptions) {
})

requestHandler = app.getRequestHandler()
upgradeHandler = app.getUpgradeHandler()
resolve(app)
})

Expand Down
52 changes: 39 additions & 13 deletions packages/next/server/next-server.ts
Expand Up @@ -54,6 +54,7 @@ import { apiResolver } from './api-utils/node'
import { RenderOpts, renderToHTML } from './render'
import { renderToHTMLOrFlight as appRenderToHTMLOrFlight } from './app-render'
import { ParsedUrl, parseUrl } from '../shared/lib/router/utils/parse-url'
import { parse as nodeParseUrl } from 'url'
import * as Log from '../build/output/log'
import loadRequireHook from '../build/webpack/require-hook'

Expand Down Expand Up @@ -501,10 +502,15 @@ export default class NextNodeServer extends BaseServer {
}
}

protected async handleUpgrade(req: NodeNextRequest, socket: any, head: any) {
await this.router.execute(req, socket, nodeParseUrl(req.url, true), head)
}

protected async proxyRequest(
req: NodeNextRequest,
res: NodeNextResponse,
parsedUrl: ParsedUrl
parsedUrl: ParsedUrl,
upgradeHead?: any
) {
const { query } = parsedUrl
delete (parsedUrl as any).query
Expand All @@ -516,27 +522,46 @@ export default class NextNodeServer extends BaseServer {
changeOrigin: true,
ignorePath: true,
xfwd: true,
proxyTimeout: 30_000, // limit proxying to 30 seconds
ws: true,
// we limit proxy requests to 30s by default, in development
// we don't time out WebSocket requests to allow proxying
proxyTimeout: upgradeHead && this.renderOpts.dev ? undefined : 30_000,
})

await new Promise((proxyResolve, proxyReject) => {
let finished = false

proxy.on('proxyReq', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
proxyResolve(true)
}
})
})
proxy.on('error', (err) => {
console.error(`Failed to proxy ${target}`, err)
if (!finished) {
finished = true
proxyReject(err)
}
})
proxy.web(req.originalRequest, res.originalResponse)

// if upgrade head is present treat as WebSocket request
if (upgradeHead) {
proxy.on('proxyReqWs', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
proxyResolve(true)
}
})
})
proxy.ws(req as any as IncomingMessage, res, upgradeHead)
proxyResolve(true)
} else {
proxy.on('proxyReq', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
proxyResolve(true)
}
})
})
proxy.web(req.originalRequest, res.originalResponse)
}
})

return {
Expand Down Expand Up @@ -989,7 +1014,7 @@ export default class NextNodeServer extends BaseServer {
matchesLocale: true,
matchesLocaleAPIRoutes: true,
matchesTrailingSlash: true,
fn: async (req, res, params, parsedUrl) => {
fn: async (req, res, params, parsedUrl, upgradeHead) => {
const { newUrl, parsedDestination } = prepareDestination({
appendParamsToQuery: true,
destination: rewriteRoute.destination,
Expand All @@ -1002,7 +1027,8 @@ export default class NextNodeServer extends BaseServer {
return this.proxyRequest(
req as NodeNextRequest,
res as NodeNextResponse,
parsedDestination
parsedDestination,
upgradeHead
)
}

Expand Down
9 changes: 9 additions & 0 deletions packages/next/server/next.ts
Expand Up @@ -62,6 +62,15 @@ export class NextServer {
}
}

getUpgradeHandler() {
return async (req: IncomingMessage, socket: any, head: any) => {
const server = await this.getServer()
// @ts-expect-error we mark this as protected so it
// causes an error here
return server.handleUpgrade.apply(server, [req, socket, head])
}
}

setAssetPrefix(assetPrefix: string) {
if (this.server) {
this.server.setAssetPrefix(assetPrefix)
Expand Down
14 changes: 11 additions & 3 deletions packages/next/server/router.ts
Expand Up @@ -39,7 +39,8 @@ export type Route = {
req: BaseNextRequest,
res: BaseNextResponse,
params: Params,
parsedUrl: NextUrlWithParsedQuery
parsedUrl: NextUrlWithParsedQuery,
upgradeHead?: any
) => Promise<RouteResult> | RouteResult
}

Expand Down Expand Up @@ -130,7 +131,8 @@ export default class Router {
async execute(
req: BaseNextRequest,
res: BaseNextResponse,
parsedUrl: NextUrlWithParsedQuery
parsedUrl: NextUrlWithParsedQuery,
upgradeHead?: any
): Promise<boolean> {
if (this.seenRequests.has(req)) {
throw new Error(
Expand Down Expand Up @@ -306,6 +308,11 @@ export default class Router {
]

for (const testRoute of allRoutes) {
// only process rewrites for upgrade request
if (upgradeHead && testRoute.type !== 'rewrite') {
continue
}

const originalPathname = parsedUrlUpdated.pathname as string
const pathnameInfo = getNextPathnameInfo(originalPathname, {
nextConfig: this.nextConfig,
Expand Down Expand Up @@ -388,7 +395,8 @@ export default class Router {
req,
res,
newParams,
parsedUrlUpdated
parsedUrlUpdated,
upgradeHead
)

if (result.finished) {
Expand Down
5 changes: 5 additions & 0 deletions test/integration/custom-routes/next.config.js
Expand Up @@ -12,6 +12,11 @@ module.exports = {
},
]
: []),
{
source: '/to-websocket',
destination:
'http://localhost:__EXTERNAL_PORT__/_next/webpack-hmr?page=/about',
},
{
source: '/to-nowhere',
destination: 'http://localhost:12233',
Expand Down
39 changes: 38 additions & 1 deletion test/integration/custom-routes/test/index.test.js
Expand Up @@ -5,6 +5,7 @@ import url from 'url'
import stripAnsi from 'strip-ansi'
import fs from 'fs-extra'
import { join } from 'path'
import WebSocket from 'ws'
import cheerio from 'cheerio'
import webdriver from 'next-webdriver'
import escapeRegex from 'escape-string-regexp'
Expand Down Expand Up @@ -39,6 +40,29 @@ let appPort
let app

const runTests = (isDev = false) => {
it('should successfully rewrite a WebSocket request', async () => {
const messages = []
const ws = await new Promise((resolve, reject) => {
let socket = new WebSocket(`ws://localhost:${appPort}/to-websocket`)
socket.on('message', (data) => {
messages.push(data.toString())
})
socket.on('open', () => resolve(socket))
socket.on('error', (err) => {
console.error(err)
socket.close()
reject()
})
})

await check(
() => (messages.length > 0 ? 'success' : JSON.stringify(messages)),
'success'
)
ws.close()
expect([...externalServerHits]).toEqual(['/_next/webpack-hmr?page=/about'])
})

it('should not rewrite for _next/data route when a match is found', async () => {
const initial = await fetchViaHTTP(appPort, '/overridden/first')
expect(initial.status).toBe(200)
Expand Down Expand Up @@ -1809,6 +1833,11 @@ const runTests = (isDev = false) => {
},
],
afterFiles: [
{
destination: `http://localhost:${externalServerPort}/_next/webpack-hmr?page=/about`,
regex: normalizeRegEx('^\\/to-websocket(?:\\/)?$'),
source: '/to-websocket',
},
{
destination: 'http://localhost:12233',
regex: normalizeRegEx('^\\/to-nowhere(?:\\/)?$'),
Expand Down Expand Up @@ -2235,6 +2264,14 @@ describe('Custom routes', () => {
const externalHost = req.headers['host']
res.end(`hi ${nextHost} from ${externalHost}`)
})
const wsServer = new WebSocket.Server({ noServer: true })

externalServer.on('upgrade', (req, socket, head) => {
externalServerHits.add(req.url)
wsServer.handleUpgrade(req, socket, head, (client) => {
client.send('hello world')
})
})
await new Promise((resolve, reject) => {
externalServer.listen(externalServerPort, (error) => {
if (error) return reject(error)
Expand All @@ -2244,7 +2281,7 @@ describe('Custom routes', () => {
nextConfigRestoreContent = await fs.readFile(nextConfigPath, 'utf8')
await fs.writeFile(
nextConfigPath,
nextConfigRestoreContent.replace(/__EXTERNAL_PORT__/, externalServerPort)
nextConfigRestoreContent.replace(/__EXTERNAL_PORT__/g, externalServerPort)
)
})
afterAll(async () => {
Expand Down

0 comments on commit b15a976

Please sign in to comment.