Skip to content

Commit

Permalink
Use rendering utils from vercel#35372
Browse files Browse the repository at this point in the history
  • Loading branch information
timneutkens committed Mar 18, 2022
1 parent 1082c26 commit 28ca741
Showing 1 changed file with 11 additions and 284 deletions.
295 changes: 11 additions & 284 deletions packages/next/server/root-render.tsx
Expand Up @@ -8,7 +8,17 @@ import { renderToReadableStream } from 'next/dist/compiled/react-server-dom-webp
import { StyleRegistry, createStyleRegistry } from 'styled-jsx'
import { NextParsedUrlQuery } from './request-meta'
import RenderResult from './render-result'
import { readableStreamTee } from './web/utils'
import {
readableStreamTee,
encodeText,
decodeText,
pipeThrough,
streamFromArray,
streamToString,
chainStreams,
createBufferedTransformStream,
renderToStream,
} from './node-web-streams-helper'
import { FlushEffectsContext } from '../shared/lib/flush-effects'

const DOCTYPE = '<!DOCTYPE html>'
Expand Down Expand Up @@ -343,286 +353,3 @@ export async function renderToHTML(

return new RenderResult(chainStreams(streams))
}

function createTransformStream<Input, Output>({
flush,
transform,
}: {
flush?: (
controller: TransformStreamDefaultController<Output>
) => Promise<void> | void
transform?: (
chunk: Input,
controller: TransformStreamDefaultController<Output>
) => Promise<void> | void
}): TransformStream<Input, Output> {
const source = new TransformStream()
const sink = new TransformStream()
const reader = source.readable.getReader()
const writer = sink.writable.getWriter()

const controller = {
enqueue(chunk: Output) {
writer.write(chunk)
},

error(reason: Error) {
writer.abort(reason)
reader.cancel()
},

terminate() {
writer.close()
reader.cancel()
},

get desiredSize() {
return writer.desiredSize
},
}

;(async () => {
try {
while (true) {
const { done, value } = await reader.read()

if (done) {
const maybePromise = flush?.(controller)
if (maybePromise) {
await maybePromise
}
writer.close()
return
}

if (transform) {
const maybePromise = transform(value, controller)
if (maybePromise) {
await maybePromise
}
} else {
controller.enqueue(value)
}
}
} catch (err) {
writer.abort(err)
}
})()

return {
readable: sink.readable,
writable: source.writable,
}
}

function createBufferedTransformStream(): TransformStream<
Uint8Array,
Uint8Array
> {
let bufferedString = ''
let pendingFlush: Promise<void> | null = null

const flushBuffer = (controller: TransformStreamDefaultController) => {
if (!pendingFlush) {
pendingFlush = new Promise((resolve) => {
setTimeout(() => {
controller.enqueue(encodeText(bufferedString))
bufferedString = ''
pendingFlush = null
resolve()
}, 0)
})
}
return pendingFlush
}

return createTransformStream({
transform(chunk, controller) {
bufferedString += decodeText(chunk)
flushBuffer(controller)
},

flush() {
if (pendingFlush) {
return pendingFlush
}
},
})
}

function createFlushEffectStream(
handleFlushEffect: () => Promise<string>
): TransformStream<Uint8Array, Uint8Array> {
return createTransformStream({
async transform(chunk, controller) {
const extraChunk = await handleFlushEffect()
// those should flush together at once
controller.enqueue(encodeText(extraChunk + decodeText(chunk)))
},
})
}

async function renderToStream({
ReactDOMServer,
element,
dataStream,
generateStaticHTML,
flushEffectHandler,
}: {
ReactDOMServer: typeof import('react-dom/server')
element: React.ReactElement
dataStream?: ReadableStream<Uint8Array>
generateStaticHTML: boolean
flushEffectHandler?: () => Promise<string>
}): Promise<ReadableStream<Uint8Array>> {
const renderStream: ReadableStream<Uint8Array> & {
allReady?: Promise<void>
} = await (ReactDOMServer as any).renderToReadableStream(element)

if (generateStaticHTML) {
await renderStream.allReady
}

const transforms: Array<TransformStream<Uint8Array, Uint8Array>> = [
createBufferedTransformStream(),
flushEffectHandler ? createFlushEffectStream(flushEffectHandler) : null,
dataStream ? createInlineDataStream(dataStream) : null,
].filter(Boolean) as any

return transforms.reduce(
(readable, transform) => pipeThrough(readable, transform),
renderStream
)
}

function encodeText(input: string) {
return new TextEncoder().encode(input)
}

function decodeText(input?: Uint8Array) {
return new TextDecoder().decode(input)
}

function createInlineDataStream(
dataStream: ReadableStream<Uint8Array>
): TransformStream<Uint8Array, Uint8Array> {
let dataStreamFinished: Promise<void> | null = null
return createTransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)

if (!dataStreamFinished) {
const dataStreamReader = dataStream.getReader()

// NOTE: streaming flush
// We are buffering here for the inlined data stream because the
// "shell" stream might be chunkenized again by the underlying stream
// implementation, e.g. with a specific high-water mark. To ensure it's
// the safe timing to pipe the data stream, this extra tick is
// necessary.
dataStreamFinished = new Promise((res) =>
setTimeout(async () => {
try {
while (true) {
const { done, value } = await dataStreamReader.read()
if (done) {
return res()
}
controller.enqueue(value)
}
} catch (err) {
controller.error(err)
}
res()
}, 0)
)
}
},
flush() {
if (dataStreamFinished) {
return dataStreamFinished
}
},
})
}

function pipeTo<T>(
readable: ReadableStream<T>,
writable: WritableStream<T>,
options?: { preventClose: boolean }
) {
let resolver: () => void
const promise = new Promise<void>((resolve) => (resolver = resolve))

const reader = readable.getReader()
const writer = writable.getWriter()
function process() {
reader.read().then(({ done, value }) => {
if (done) {
if (options?.preventClose) {
writer.releaseLock()
} else {
writer.close()
}
resolver()
} else {
writer.write(value)
process()
}
})
}
process()
return promise
}

function pipeThrough<Input, Output>(
readable: ReadableStream<Input>,
transformStream: TransformStream<Input, Output>
) {
pipeTo(readable, transformStream.writable)
return transformStream.readable
}

function chainStreams<T>(streams: ReadableStream<T>[]): ReadableStream<T> {
const { readable, writable } = new TransformStream()

let promise = Promise.resolve()
for (let i = 0; i < streams.length; ++i) {
promise = promise.then(() =>
pipeTo(streams[i], writable, {
preventClose: i + 1 < streams.length,
})
)
}

return readable
}

function streamFromArray(strings: string[]): ReadableStream<Uint8Array> {
// Note: we use a TransformStream here instead of instantiating a ReadableStream
// because the built-in ReadableStream polyfill runs strings through TextEncoder.
const { readable, writable } = new TransformStream()

const writer = writable.getWriter()
strings.forEach((str) => writer.write(encodeText(str)))
writer.close()

return readable
}

async function streamToString(
stream: ReadableStream<Uint8Array>
): Promise<string> {
const reader = stream.getReader()
let bufferedString = ''

while (true) {
const { done, value } = await reader.read()

if (done) {
return bufferedString
}

bufferedString += decodeText(value)
}
}

0 comments on commit 28ca741

Please sign in to comment.