Skip to content

Commit

Permalink
Polyfill pipeTo and pipeThrough (vercel#34112)
Browse files Browse the repository at this point in the history
* polyfill pipeTo

* add pipeThrough

* covert pipeThrough calls

* use pipe

* invert logic

Co-authored-by: Jiachi Liu <inbox@huozhi.im>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored and natew committed Feb 16, 2022
1 parent 538c74a commit a06db16
Showing 1 changed file with 49 additions and 8 deletions.
57 changes: 49 additions & 8 deletions packages/next/server/render.tsx
Expand Up @@ -1133,7 +1133,9 @@ export async function renderToHTML(
}),
serverComponentManifest
)
return new RenderResult(stream.pipeThrough(createBufferedTransformStream()))
return new RenderResult(
pipeThrough(stream, createBufferedTransformStream())
)
}

// we preload the buildManifest for auto-export dynamic pages
Expand Down Expand Up @@ -1675,16 +1677,18 @@ function renderToStream(
// defer to a microtask to ensure `stream` is set.
Promise.resolve().then(() =>
resolve(
stream
.pipeThrough(createBufferedTransformStream())
.pipeThrough(
pipeThrough(
pipeThrough(
pipeThrough(stream, createBufferedTransformStream()),
createInlineDataStream(
dataStream.pipeThrough(
pipeThrough(
dataStream,
createPrefixStream(suffixState?.suffixUnclosed ?? null)
)
)
)
.pipeThrough(createSuffixStream(suffixState?.closeTag ?? null))
),
createSuffixStream(suffixState?.closeTag ?? null)
)
)
)
}
Expand Down Expand Up @@ -1773,13 +1777,50 @@ function createInlineDataStream(
})
}

function pipeTo(
readable: ReadableStream,
writable: WritableStream,
options?: { preventClose: boolean }
) {
let resolver: () => void
const promise = new Promise<void>((resolve) => (resolver = resolve))

const reader = readable.getReader()
const writer = writable.getWriter()
function process() {
reader.read().then(({ done, value }) => {
if (done) {
if (options?.preventClose) {
writer.releaseLock()
} else {
writer.close()
}
resolver()
} else {
writer.write(value)
process()
}
})
}
process()
return promise
}

function pipeThrough(
readable: ReadableStream,
transformStream: TransformStream
) {
pipeTo(readable, transformStream.writable)
return transformStream.readable
}

function chainStreams(streams: ReadableStream[]): ReadableStream {
const { readable, writable } = new TransformStream()

let promise = Promise.resolve()
for (let i = 0; i < streams.length; ++i) {
promise = promise.then(() =>
streams[i].pipeTo(writable, {
pipeTo(streams[i], writable, {
preventClose: i + 1 < streams.length,
})
)
Expand Down

0 comments on commit a06db16

Please sign in to comment.