Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix only byte stream writing is allowed in CF workers #34893

Merged
merged 5 commits into from Feb 28, 2022
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
181 changes: 84 additions & 97 deletions packages/next/server/render.tsx
Expand Up @@ -304,17 +304,15 @@ const rscCache = new Map()

function createRSCHook() {
return (
writable: WritableStream<string>,
writable: WritableStream<Uint8Array>,
id: string,
req: ReadableStream<string>,
req: ReadableStream<Uint8Array>,
bootstrap: boolean
) => {
let entry = rscCache.get(id)
if (!entry) {
const [renderStream, forwardStream] = readableStreamTee(req)
entry = createFromReadableStream(
pipeThrough(renderStream, createTextEncoderStream())
)
entry = createFromReadableStream(renderStream)
rscCache.set(id, entry)

let bootstrapped = false
Expand All @@ -325,22 +323,23 @@ function createRSCHook() {
if (bootstrap && !bootstrapped) {
bootstrapped = true
writer.write(
`<script>(self.__next_s=self.__next_s||[]).push(${JSON.stringify([
0,
id,
])})</script>`
encodeText(
`<script>(self.__next_s=self.__next_s||[]).push(${JSON.stringify(
[0, id]
)})</script>`
)
)
}
if (done) {
rscCache.delete(id)
writer.close()
} else {
writer.write(
`<script>(self.__next_s=self.__next_s||[]).push(${JSON.stringify([
1,
id,
value,
])})</script>`
encodeText(
`<script>(self.__next_s=self.__next_s||[]).push(${JSON.stringify(
[1, id, decodeText(value)]
)})</script>`
)
)
process()
}
Expand All @@ -365,7 +364,7 @@ function createServerComponentRenderer(
runtime,
}: {
cachePrefix: string
transformStream: TransformStream<string, string>
transformStream: TransformStream<Uint8Array, Uint8Array>
serverComponentManifest: NonNullable<RenderOpts['serverComponentManifest']>
runtime: 'nodejs' | 'edge'
}
Expand All @@ -381,12 +380,9 @@ function createServerComponentRenderer(
const writable = transformStream.writable
const ServerComponentWrapper = (props: any) => {
const id = (React as any).useId()
const reqStream: ReadableStream<string> = pipeThrough(
renderToReadableStream(
renderFlight(App, OriginalComponent, props),
serverComponentManifest
),
createTextDecoderStream()
const reqStream: ReadableStream<Uint8Array> = renderToReadableStream(
renderFlight(App, OriginalComponent, props),
serverComponentManifest
)

const response = useRSCResponse(
Expand Down Expand Up @@ -482,8 +478,8 @@ export async function renderToHTML(
let Component: React.ComponentType<{}> | ((props: any) => JSX.Element) =
renderOpts.Component
let serverComponentsInlinedTransformStream: TransformStream<
string,
string
Uint8Array,
Uint8Array
> | null = null

if (isServerComponent) {
Expand Down Expand Up @@ -1181,21 +1177,16 @@ export async function renderToHTML(
if (isResSent(res) && !isSSG) return null

if (renderServerComponentData) {
const stream: ReadableStream<string> = pipeThrough(
renderToReadableStream(
renderFlight(App, OriginalComponent, {
...props.pageProps,
...serverComponentProps,
}),
serverComponentManifest
),
createTextDecoderStream()
const stream: ReadableStream<Uint8Array> = renderToReadableStream(
renderFlight(App, OriginalComponent, {
...props.pageProps,
...serverComponentProps,
}),
serverComponentManifest
)

return new RenderResult(
pipeThrough(
pipeThrough(stream, createBufferedTransformStream()),
createTextEncoderStream()
)
pipeThrough(stream, createBufferedTransformStream())
)
}

Expand Down Expand Up @@ -1360,7 +1351,8 @@ export async function renderToHTML(
generateStaticHTML: true,
})

return await streamToString(flushEffectStream)
const flushed = await streamToString(flushEffectStream)
return flushed
}

return await renderToStream({
Expand Down Expand Up @@ -1607,9 +1599,7 @@ export async function renderToHTML(
return new RenderResult(html)
}

return new RenderResult(
pipeThrough(chainStreams(streams), createTextEncoderStream())
)
return new RenderResult(chainStreams(streams))
}

function errorToJSON(err: Error) {
Expand Down Expand Up @@ -1707,35 +1697,18 @@ function createTransformStream<Input, Output>({
}
}

function createTextDecoderStream(): TransformStream<Uint8Array, string> {
const decoder = new TextDecoder()
return createTransformStream({
transform(chunk, controller) {
controller.enqueue(
typeof chunk === 'string' ? chunk : decoder.decode(chunk)
)
},
})
}

function createTextEncoderStream(): TransformStream<string, Uint8Array> {
const encoder = new TextEncoder()
return createTransformStream({
transform(chunk, controller) {
controller.enqueue(encoder.encode(chunk))
},
})
}

function createBufferedTransformStream(): TransformStream<string, string> {
function createBufferedTransformStream(): TransformStream<
Uint8Array,
Uint8Array
> {
let bufferedString = ''
let pendingFlush: Promise<void> | null = null

const flushBuffer = (controller: TransformStreamDefaultController) => {
if (!pendingFlush) {
pendingFlush = new Promise((resolve) => {
setTimeout(() => {
controller.enqueue(bufferedString)
controller.enqueue(encodeText(bufferedString))
bufferedString = ''
pendingFlush = null
resolve()
Expand All @@ -1747,7 +1720,7 @@ function createBufferedTransformStream(): TransformStream<string, string> {

return createTransformStream({
transform(chunk, controller) {
bufferedString += chunk
bufferedString += decodeText(chunk)
flushBuffer(controller)
},

Expand All @@ -1761,11 +1734,11 @@ function createBufferedTransformStream(): TransformStream<string, string> {

function createFlushEffectStream(
handleFlushEffect: () => Promise<string>
): TransformStream<string, string> {
): TransformStream<Uint8Array, Uint8Array> {
return createTransformStream({
async transform(chunk, controller) {
const extraChunk = await handleFlushEffect()
controller.enqueue(extraChunk + chunk)
controller.enqueue(encodeText(extraChunk + decodeText(chunk)))
},
})
}
Expand All @@ -1781,10 +1754,10 @@ function renderToStream({
ReactDOMServer: typeof import('react-dom/server')
element: React.ReactElement
suffix?: string
dataStream?: ReadableStream<string>
dataStream?: ReadableStream<Uint8Array>
generateStaticHTML: boolean
flushEffectHandler?: () => Promise<string>
}): Promise<ReadableStream<string>> {
}): Promise<ReadableStream<Uint8Array>> {
return new Promise((resolve, reject) => {
let resolved = false

Expand All @@ -1799,7 +1772,7 @@ function renderToStream({
// defer to a microtask to ensure `stream` is set.
resolve(
Promise.resolve().then(() => {
const transforms: Array<TransformStream<string, string>> = [
const transforms: Array<TransformStream<Uint8Array, Uint8Array>> = [
createBufferedTransformStream(),
flushEffectHandler
? createFlushEffectStream(flushEffectHandler)
Expand All @@ -1820,61 +1793,73 @@ function renderToStream({
}
}

const renderStream = pipeThrough(
(ReactDOMServer as any).renderToReadableStream(element, {
onError(err: Error) {
if (!resolved) {
resolved = true
reject(err)
}
},
onCompleteShell() {
if (!generateStaticHTML) {
doResolve()
}
},
onCompleteAll() {
const renderStream: ReadableStream<Uint8Array> = (
ReactDOMServer as any
).renderToReadableStream(element, {
onError(err: Error) {
if (!resolved) {
resolved = true
reject(err)
}
},
onCompleteShell() {
if (!generateStaticHTML) {
doResolve()
},
}),
createTextDecoderStream()
)
}
},
onCompleteAll() {
doResolve()
},
})
})
}

function createSuffixStream(suffix: string): TransformStream<string, string> {
function encodeText(input: string) {
return new TextEncoder().encode(input)
}

function decodeText(input?: Uint8Array) {
return new TextDecoder().decode(input)
}

function createSuffixStream(
suffix: string
): TransformStream<Uint8Array, Uint8Array> {
return createTransformStream({
flush(controller) {
if (suffix) {
controller.enqueue(suffix)
controller.enqueue(encodeText(suffix))
}
},
})
}

function createPrefixStream(prefix: string): TransformStream<string, string> {
function createPrefixStream(
prefix: string
): TransformStream<Uint8Array, Uint8Array> {
let prefixFlushed = false
return createTransformStream({
transform(chunk, controller) {
if (!prefixFlushed && prefix) {
prefixFlushed = true
controller.enqueue(chunk + prefix)
controller.enqueue(chunk)
controller.enqueue(encodeText(prefix))
} else {
controller.enqueue(chunk)
}
},
flush(controller) {
if (!prefixFlushed && prefix) {
prefixFlushed = true
controller.enqueue(prefix)
controller.enqueue(encodeText(prefix))
}
},
})
}

function createInlineDataStream(
dataStream: ReadableStream<string>
): TransformStream<string, string> {
dataStream: ReadableStream<Uint8Array>
): TransformStream<Uint8Array, Uint8Array> {
let dataStreamFinished: Promise<void> | null = null
return createTransformStream({
transform(chunk, controller) {
Expand Down Expand Up @@ -1966,19 +1951,21 @@ function chainStreams<T>(streams: ReadableStream<T>[]): ReadableStream<T> {
return readable
}

function streamFromArray(strings: string[]): ReadableStream<string> {
function streamFromArray(strings: string[]): ReadableStream<Uint8Array> {
// Note: we use a TransformStream here instead of instantiating a ReadableStream
// because the built-in ReadableStream polyfill runs strings through TextEncoder.
const { readable, writable } = new TransformStream()

const writer = writable.getWriter()
strings.forEach((str) => writer.write(str))
strings.forEach((str) => writer.write(encodeText(str)))
writer.close()

return readable
}

async function streamToString(stream: ReadableStream<string>): Promise<string> {
async function streamToString(
stream: ReadableStream<Uint8Array>
): Promise<string> {
const reader = stream.getReader()
let bufferedString = ''

Expand All @@ -1989,6 +1976,6 @@ async function streamToString(stream: ReadableStream<string>): Promise<string> {
return bufferedString
}

bufferedString += value
bufferedString += decodeText(value)
}
}