Skip to content

Commit

Permalink
Fix only byte stream writing is allowed in CF workers (#34893)
Browse files Browse the repository at this point in the history
CF worker doesn't allow to use `writer.write(string)` but only byte stream, we have to transform the Uint8Array stream

![image](https://user-images.githubusercontent.com/4800338/156043536-25fcdb15-3f69-427e-9e31-97169609eb7a.png)
  • Loading branch information
huozhi committed Feb 28, 2022
1 parent 7ca78dd commit 8e3b6fc
Showing 1 changed file with 84 additions and 97 deletions.
181 changes: 84 additions & 97 deletions packages/next/server/render.tsx
Expand Up @@ -304,17 +304,15 @@ const rscCache = new Map()

function createRSCHook() {
return (
writable: WritableStream<string>,
writable: WritableStream<Uint8Array>,
id: string,
req: ReadableStream<string>,
req: ReadableStream<Uint8Array>,
bootstrap: boolean
) => {
let entry = rscCache.get(id)
if (!entry) {
const [renderStream, forwardStream] = readableStreamTee(req)
entry = createFromReadableStream(
pipeThrough(renderStream, createTextEncoderStream())
)
entry = createFromReadableStream(renderStream)
rscCache.set(id, entry)

let bootstrapped = false
Expand All @@ -325,22 +323,23 @@ function createRSCHook() {
if (bootstrap && !bootstrapped) {
bootstrapped = true
writer.write(
`<script>(self.__next_s=self.__next_s||[]).push(${JSON.stringify([
0,
id,
])})</script>`
encodeText(
`<script>(self.__next_s=self.__next_s||[]).push(${JSON.stringify(
[0, id]
)})</script>`
)
)
}
if (done) {
rscCache.delete(id)
writer.close()
} else {
writer.write(
`<script>(self.__next_s=self.__next_s||[]).push(${JSON.stringify([
1,
id,
value,
])})</script>`
encodeText(
`<script>(self.__next_s=self.__next_s||[]).push(${JSON.stringify(
[1, id, decodeText(value)]
)})</script>`
)
)
process()
}
Expand All @@ -365,7 +364,7 @@ function createServerComponentRenderer(
runtime,
}: {
cachePrefix: string
transformStream: TransformStream<string, string>
transformStream: TransformStream<Uint8Array, Uint8Array>
serverComponentManifest: NonNullable<RenderOpts['serverComponentManifest']>
runtime: 'nodejs' | 'edge'
}
Expand All @@ -381,12 +380,9 @@ function createServerComponentRenderer(
const writable = transformStream.writable
const ServerComponentWrapper = (props: any) => {
const id = (React as any).useId()
const reqStream: ReadableStream<string> = pipeThrough(
renderToReadableStream(
renderFlight(App, OriginalComponent, props),
serverComponentManifest
),
createTextDecoderStream()
const reqStream: ReadableStream<Uint8Array> = renderToReadableStream(
renderFlight(App, OriginalComponent, props),
serverComponentManifest
)

const response = useRSCResponse(
Expand Down Expand Up @@ -482,8 +478,8 @@ export async function renderToHTML(
let Component: React.ComponentType<{}> | ((props: any) => JSX.Element) =
renderOpts.Component
let serverComponentsInlinedTransformStream: TransformStream<
string,
string
Uint8Array,
Uint8Array
> | null = null

if (isServerComponent) {
Expand Down Expand Up @@ -1181,21 +1177,16 @@ export async function renderToHTML(
if (isResSent(res) && !isSSG) return null

if (renderServerComponentData) {
const stream: ReadableStream<string> = pipeThrough(
renderToReadableStream(
renderFlight(App, OriginalComponent, {
...props.pageProps,
...serverComponentProps,
}),
serverComponentManifest
),
createTextDecoderStream()
const stream: ReadableStream<Uint8Array> = renderToReadableStream(
renderFlight(App, OriginalComponent, {
...props.pageProps,
...serverComponentProps,
}),
serverComponentManifest
)

return new RenderResult(
pipeThrough(
pipeThrough(stream, createBufferedTransformStream()),
createTextEncoderStream()
)
pipeThrough(stream, createBufferedTransformStream())
)
}

Expand Down Expand Up @@ -1360,7 +1351,8 @@ export async function renderToHTML(
generateStaticHTML: true,
})

return await streamToString(flushEffectStream)
const flushed = await streamToString(flushEffectStream)
return flushed
}

return await renderToStream({
Expand Down Expand Up @@ -1607,9 +1599,7 @@ export async function renderToHTML(
return new RenderResult(html)
}

return new RenderResult(
pipeThrough(chainStreams(streams), createTextEncoderStream())
)
return new RenderResult(chainStreams(streams))
}

function errorToJSON(err: Error) {
Expand Down Expand Up @@ -1707,35 +1697,18 @@ function createTransformStream<Input, Output>({
}
}

function createTextDecoderStream(): TransformStream<Uint8Array, string> {
const decoder = new TextDecoder()
return createTransformStream({
transform(chunk, controller) {
controller.enqueue(
typeof chunk === 'string' ? chunk : decoder.decode(chunk)
)
},
})
}

function createTextEncoderStream(): TransformStream<string, Uint8Array> {
const encoder = new TextEncoder()
return createTransformStream({
transform(chunk, controller) {
controller.enqueue(encoder.encode(chunk))
},
})
}

function createBufferedTransformStream(): TransformStream<string, string> {
function createBufferedTransformStream(): TransformStream<
Uint8Array,
Uint8Array
> {
let bufferedString = ''
let pendingFlush: Promise<void> | null = null

const flushBuffer = (controller: TransformStreamDefaultController) => {
if (!pendingFlush) {
pendingFlush = new Promise((resolve) => {
setTimeout(() => {
controller.enqueue(bufferedString)
controller.enqueue(encodeText(bufferedString))
bufferedString = ''
pendingFlush = null
resolve()
Expand All @@ -1747,7 +1720,7 @@ function createBufferedTransformStream(): TransformStream<string, string> {

return createTransformStream({
transform(chunk, controller) {
bufferedString += chunk
bufferedString += decodeText(chunk)
flushBuffer(controller)
},

Expand All @@ -1761,11 +1734,11 @@ function createBufferedTransformStream(): TransformStream<string, string> {

function createFlushEffectStream(
handleFlushEffect: () => Promise<string>
): TransformStream<string, string> {
): TransformStream<Uint8Array, Uint8Array> {
return createTransformStream({
async transform(chunk, controller) {
const extraChunk = await handleFlushEffect()
controller.enqueue(extraChunk + chunk)
controller.enqueue(encodeText(extraChunk + decodeText(chunk)))
},
})
}
Expand All @@ -1781,10 +1754,10 @@ function renderToStream({
ReactDOMServer: typeof import('react-dom/server')
element: React.ReactElement
suffix?: string
dataStream?: ReadableStream<string>
dataStream?: ReadableStream<Uint8Array>
generateStaticHTML: boolean
flushEffectHandler?: () => Promise<string>
}): Promise<ReadableStream<string>> {
}): Promise<ReadableStream<Uint8Array>> {
return new Promise((resolve, reject) => {
let resolved = false

Expand All @@ -1799,7 +1772,7 @@ function renderToStream({
// defer to a microtask to ensure `stream` is set.
resolve(
Promise.resolve().then(() => {
const transforms: Array<TransformStream<string, string>> = [
const transforms: Array<TransformStream<Uint8Array, Uint8Array>> = [
createBufferedTransformStream(),
flushEffectHandler
? createFlushEffectStream(flushEffectHandler)
Expand All @@ -1820,61 +1793,73 @@ function renderToStream({
}
}

const renderStream = pipeThrough(
(ReactDOMServer as any).renderToReadableStream(element, {
onError(err: Error) {
if (!resolved) {
resolved = true
reject(err)
}
},
onCompleteShell() {
if (!generateStaticHTML) {
doResolve()
}
},
onCompleteAll() {
const renderStream: ReadableStream<Uint8Array> = (
ReactDOMServer as any
).renderToReadableStream(element, {
onError(err: Error) {
if (!resolved) {
resolved = true
reject(err)
}
},
onCompleteShell() {
if (!generateStaticHTML) {
doResolve()
},
}),
createTextDecoderStream()
)
}
},
onCompleteAll() {
doResolve()
},
})
})
}

function createSuffixStream(suffix: string): TransformStream<string, string> {
function encodeText(input: string) {
return new TextEncoder().encode(input)
}

function decodeText(input?: Uint8Array) {
return new TextDecoder().decode(input)
}

function createSuffixStream(
suffix: string
): TransformStream<Uint8Array, Uint8Array> {
return createTransformStream({
flush(controller) {
if (suffix) {
controller.enqueue(suffix)
controller.enqueue(encodeText(suffix))
}
},
})
}

function createPrefixStream(prefix: string): TransformStream<string, string> {
function createPrefixStream(
prefix: string
): TransformStream<Uint8Array, Uint8Array> {
let prefixFlushed = false
return createTransformStream({
transform(chunk, controller) {
if (!prefixFlushed && prefix) {
prefixFlushed = true
controller.enqueue(chunk + prefix)
controller.enqueue(chunk)
controller.enqueue(encodeText(prefix))
} else {
controller.enqueue(chunk)
}
},
flush(controller) {
if (!prefixFlushed && prefix) {
prefixFlushed = true
controller.enqueue(prefix)
controller.enqueue(encodeText(prefix))
}
},
})
}

function createInlineDataStream(
dataStream: ReadableStream<string>
): TransformStream<string, string> {
dataStream: ReadableStream<Uint8Array>
): TransformStream<Uint8Array, Uint8Array> {
let dataStreamFinished: Promise<void> | null = null
return createTransformStream({
transform(chunk, controller) {
Expand Down Expand Up @@ -1966,19 +1951,21 @@ function chainStreams<T>(streams: ReadableStream<T>[]): ReadableStream<T> {
return readable
}

function streamFromArray(strings: string[]): ReadableStream<string> {
function streamFromArray(strings: string[]): ReadableStream<Uint8Array> {
// Note: we use a TransformStream here instead of instantiating a ReadableStream
// because the built-in ReadableStream polyfill runs strings through TextEncoder.
const { readable, writable } = new TransformStream()

const writer = writable.getWriter()
strings.forEach((str) => writer.write(str))
strings.forEach((str) => writer.write(encodeText(str)))
writer.close()

return readable
}

async function streamToString(stream: ReadableStream<string>): Promise<string> {
async function streamToString(
stream: ReadableStream<Uint8Array>
): Promise<string> {
const reader = stream.getReader()
let bufferedString = ''

Expand All @@ -1989,6 +1976,6 @@ async function streamToString(stream: ReadableStream<string>): Promise<string> {
return bufferedString
}

bufferedString += value
bufferedString += decodeText(value)
}
}

0 comments on commit 8e3b6fc

Please sign in to comment.