Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle rewriting WebSocket requests #39463

Merged
merged 2 commits into from Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/next/server/base-server.ts
Expand Up @@ -119,6 +119,10 @@ export interface Options {
* The port the server is running behind
*/
port?: number
/**
* The HTTP Server that Next.js is running behind
*/
httpServer?: import('http').Server
}

export interface BaseRequestHandler {
Expand Down Expand Up @@ -665,6 +669,12 @@ export default abstract class Server<ServerOptions extends Options = Options> {
return this.handleRequest.bind(this)
}

protected async handleUpgrade(
_req: BaseNextRequest,
_socket: any,
_head?: any
): Promise<void> {}

public setAssetPrefix(prefix?: string): void {
this.renderOpts.assetPrefix = prefix ? prefix.replace(/\/$/, '') : ''
}
Expand Down
6 changes: 2 additions & 4 deletions packages/next/server/dev/next-dev-server.ts
Expand Up @@ -83,10 +83,6 @@ const ReactDevOverlay = (props: any) => {
}

export interface Options extends ServerOptions {
/**
* The HTTP Server that Next.js is running behind
*/
httpServer?: HTTPServer
/**
* Tells of Next.js is running from the `next dev` command
*/
Expand Down Expand Up @@ -629,6 +625,8 @@ export default class DevServer extends Server {
)
) {
this.hotReloader?.onHMR(req, socket, head)
} else {
this.handleUpgrade(req, socket, head)
}
})
}
Expand Down
9 changes: 9 additions & 0 deletions packages/next/server/lib/start-server.ts
Expand Up @@ -39,6 +39,14 @@ export function startServer(opts: StartServerOptions) {
}
})

let upgradeHandler: any

if (!opts.dev) {
server.on('upgrade', (req, socket, upgrade) => {
upgradeHandler(req, socket, upgrade)
})
}

server.on('listening', () => {
const addr = server.address()
const hostname =
Expand All @@ -55,6 +63,7 @@ export function startServer(opts: StartServerOptions) {
})

requestHandler = app.getRequestHandler()
upgradeHandler = app.getUpgradeHandler()
resolve(app)
})

Expand Down
52 changes: 39 additions & 13 deletions packages/next/server/next-server.ts
Expand Up @@ -54,6 +54,7 @@ import { apiResolver } from './api-utils/node'
import { RenderOpts, renderToHTML } from './render'
import { renderToHTMLOrFlight as appRenderToHTMLOrFlight } from './app-render'
import { ParsedUrl, parseUrl } from '../shared/lib/router/utils/parse-url'
import { parse as nodeParseUrl } from 'url'
import * as Log from '../build/output/log'
import loadRequireHook from '../build/webpack/require-hook'

Expand Down Expand Up @@ -501,10 +502,15 @@ export default class NextNodeServer extends BaseServer {
}
}

protected async handleUpgrade(req: NodeNextRequest, socket: any, head: any) {
await this.router.execute(req, socket, nodeParseUrl(req.url, true), head)
}

protected async proxyRequest(
req: NodeNextRequest,
res: NodeNextResponse,
parsedUrl: ParsedUrl
parsedUrl: ParsedUrl,
upgradeHead?: any
) {
const { query } = parsedUrl
delete (parsedUrl as any).query
Expand All @@ -516,27 +522,46 @@ export default class NextNodeServer extends BaseServer {
changeOrigin: true,
ignorePath: true,
xfwd: true,
proxyTimeout: 30_000, // limit proxying to 30 seconds
ws: true,
// we limit proxy requests to 30s by default, in development
// we don't time out WebSocket requests to allow proxying
proxyTimeout: upgradeHead && this.renderOpts.dev ? undefined : 30_000,
})

await new Promise((proxyResolve, proxyReject) => {
let finished = false

proxy.on('proxyReq', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
proxyResolve(true)
}
})
})
proxy.on('error', (err) => {
console.error(`Failed to proxy ${target}`, err)
if (!finished) {
finished = true
proxyReject(err)
}
})
proxy.web(req.originalRequest, res.originalResponse)

// if upgrade head is present treat as WebSocket request
if (upgradeHead) {
proxy.on('proxyReqWs', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
proxyResolve(true)
}
})
})
proxy.ws(req as any as IncomingMessage, res, upgradeHead)
proxyResolve(true)
} else {
proxy.on('proxyReq', (proxyReq) => {
proxyReq.on('close', () => {
if (!finished) {
finished = true
proxyResolve(true)
}
})
})
proxy.web(req.originalRequest, res.originalResponse)
}
})

return {
Expand Down Expand Up @@ -989,7 +1014,7 @@ export default class NextNodeServer extends BaseServer {
matchesLocale: true,
matchesLocaleAPIRoutes: true,
matchesTrailingSlash: true,
fn: async (req, res, params, parsedUrl) => {
fn: async (req, res, params, parsedUrl, upgradeHead) => {
const { newUrl, parsedDestination } = prepareDestination({
appendParamsToQuery: true,
destination: rewriteRoute.destination,
Expand All @@ -1002,7 +1027,8 @@ export default class NextNodeServer extends BaseServer {
return this.proxyRequest(
req as NodeNextRequest,
res as NodeNextResponse,
parsedDestination
parsedDestination,
upgradeHead
)
}

Expand Down
9 changes: 9 additions & 0 deletions packages/next/server/next.ts
Expand Up @@ -62,6 +62,15 @@ export class NextServer {
}
}

getUpgradeHandler() {
return async (req: IncomingMessage, socket: any, head: any) => {
const server = await this.getServer()
// @ts-expect-error we mark this as protected so it
// causes an error here
return server.handleUpgrade.apply(server, [req, socket, head])
}
}

setAssetPrefix(assetPrefix: string) {
if (this.server) {
this.server.setAssetPrefix(assetPrefix)
Expand Down
14 changes: 11 additions & 3 deletions packages/next/server/router.ts
Expand Up @@ -39,7 +39,8 @@ export type Route = {
req: BaseNextRequest,
res: BaseNextResponse,
params: Params,
parsedUrl: NextUrlWithParsedQuery
parsedUrl: NextUrlWithParsedQuery,
upgradeHead?: any
) => Promise<RouteResult> | RouteResult
}

Expand Down Expand Up @@ -130,7 +131,8 @@ export default class Router {
async execute(
req: BaseNextRequest,
res: BaseNextResponse,
parsedUrl: NextUrlWithParsedQuery
parsedUrl: NextUrlWithParsedQuery,
upgradeHead?: any
): Promise<boolean> {
if (this.seenRequests.has(req)) {
throw new Error(
Expand Down Expand Up @@ -306,6 +308,11 @@ export default class Router {
]

for (const testRoute of allRoutes) {
// only process rewrites for upgrade request
if (upgradeHead && testRoute.type !== 'rewrite') {
continue
}

const originalPathname = parsedUrlUpdated.pathname as string
const pathnameInfo = getNextPathnameInfo(originalPathname, {
nextConfig: this.nextConfig,
Expand Down Expand Up @@ -388,7 +395,8 @@ export default class Router {
req,
res,
newParams,
parsedUrlUpdated
parsedUrlUpdated,
upgradeHead
)

if (result.finished) {
Expand Down
5 changes: 5 additions & 0 deletions test/integration/custom-routes/next.config.js
Expand Up @@ -12,6 +12,11 @@ module.exports = {
},
]
: []),
{
source: '/to-websocket',
destination:
'http://localhost:__EXTERNAL_PORT__/_next/webpack-hmr?page=/about',
},
{
source: '/to-nowhere',
destination: 'http://localhost:12233',
Expand Down
39 changes: 38 additions & 1 deletion test/integration/custom-routes/test/index.test.js
Expand Up @@ -5,6 +5,7 @@ import url from 'url'
import stripAnsi from 'strip-ansi'
import fs from 'fs-extra'
import { join } from 'path'
import WebSocket from 'ws'
import cheerio from 'cheerio'
import webdriver from 'next-webdriver'
import escapeRegex from 'escape-string-regexp'
Expand Down Expand Up @@ -39,6 +40,29 @@ let appPort
let app

const runTests = (isDev = false) => {
it('should successfully rewrite a WebSocket request', async () => {
const messages = []
const ws = await new Promise((resolve, reject) => {
let socket = new WebSocket(`ws://localhost:${appPort}/to-websocket`)
socket.on('message', (data) => {
messages.push(data.toString())
})
socket.on('open', () => resolve(socket))
socket.on('error', (err) => {
console.error(err)
socket.close()
reject()
})
})

await check(
() => (messages.length > 0 ? 'success' : JSON.stringify(messages)),
'success'
)
ws.close()
expect([...externalServerHits]).toEqual(['/_next/webpack-hmr?page=/about'])
})

it('should not rewrite for _next/data route when a match is found', async () => {
const initial = await fetchViaHTTP(appPort, '/overridden/first')
expect(initial.status).toBe(200)
Expand Down Expand Up @@ -1809,6 +1833,11 @@ const runTests = (isDev = false) => {
},
],
afterFiles: [
{
destination: `http://localhost:${externalServerPort}/_next/webpack-hmr?page=/about`,
regex: normalizeRegEx('^\\/to-websocket(?:\\/)?$'),
source: '/to-websocket',
},
{
destination: 'http://localhost:12233',
regex: normalizeRegEx('^\\/to-nowhere(?:\\/)?$'),
Expand Down Expand Up @@ -2235,6 +2264,14 @@ describe('Custom routes', () => {
const externalHost = req.headers['host']
res.end(`hi ${nextHost} from ${externalHost}`)
})
const wsServer = new WebSocket.Server({ noServer: true })

externalServer.on('upgrade', (req, socket, head) => {
externalServerHits.add(req.url)
wsServer.handleUpgrade(req, socket, head, (client) => {
client.send('hello world')
})
})
await new Promise((resolve, reject) => {
externalServer.listen(externalServerPort, (error) => {
if (error) return reject(error)
Expand All @@ -2244,7 +2281,7 @@ describe('Custom routes', () => {
nextConfigRestoreContent = await fs.readFile(nextConfigPath, 'utf8')
await fs.writeFile(
nextConfigPath,
nextConfigRestoreContent.replace(/__EXTERNAL_PORT__/, externalServerPort)
nextConfigRestoreContent.replace(/__EXTERNAL_PORT__/g, externalServerPort)
)
})
afterAll(async () => {
Expand Down