diff --git a/.github/workflows/build_test_deploy.yml b/.github/workflows/build_test_deploy.yml index bf9b28cf3a04..7937f7a0ec66 100644 --- a/.github/workflows/build_test_deploy.yml +++ b/.github/workflows/build_test_deploy.yml @@ -208,12 +208,16 @@ jobs: env: NEXT_TELEMETRY_DISABLED: 1 NEXT_TEST_JOB: 1 + strategy: + fail-fast: false + matrix: + node: [16, 17] steps: - name: Setup node uses: actions/setup-node@v2 if: ${{needs.build.outputs.docsChange != 'docs only change'}} with: - node-version: 14 + node-version: ${{ matrix.node }} - run: echo ${{needs.build.outputs.docsChange}} @@ -258,12 +262,16 @@ jobs: env: NEXT_TELEMETRY_DISABLED: 1 NEXT_TEST_JOB: 1 + strategy: + fail-fast: false + matrix: + node: [16, 17] steps: - name: Setup node uses: actions/setup-node@v2 if: ${{needs.build.outputs.docsChange != 'docs only change'}} with: - node-version: 14 + node-version: ${{ matrix.node }} - run: echo ${{needs.build.outputs.docsChange}} @@ -308,12 +316,16 @@ jobs: env: NEXT_TELEMETRY_DISABLED: 1 NEXT_TEST_JOB: 1 + strategy: + fail-fast: false + matrix: + node: [16, 17] steps: - name: Setup node uses: actions/setup-node@v2 if: ${{needs.build.outputs.docsChange != 'docs only change'}} with: - node-version: 14 + node-version: ${{ matrix.node }} - run: echo ${{needs.build.outputs.docsChange}} @@ -348,12 +360,16 @@ jobs: env: NEXT_TELEMETRY_DISABLED: 1 NEXT_TEST_JOB: 1 + strategy: + fail-fast: false + matrix: + node: [16, 17] steps: - name: Setup node uses: actions/setup-node@v2 if: ${{needs.build.outputs.docsChange != 'docs only change'}} with: - node-version: 14 + node-version: ${{ matrix.node }} - run: echo ${{needs.build.outputs.docsChange}} diff --git a/packages/next/server/node-web-streams-helper.ts b/packages/next/server/node-web-streams-helper.ts new file mode 100644 index 000000000000..b8b1cc6ff98d --- /dev/null +++ b/packages/next/server/node-web-streams-helper.ts @@ -0,0 +1,359 @@ +export function readableStreamTee( + readable: ReadableStream +): [ReadableStream, ReadableStream] { + const transformStream = new TransformStream() + const transformStream2 = new TransformStream() + const writer = transformStream.writable.getWriter() + const writer2 = transformStream2.writable.getWriter() + + const reader = readable.getReader() + function read() { + reader.read().then(({ done, value }) => { + if (done) { + writer.close() + writer2.close() + return + } + writer.write(value) + writer2.write(value) + read() + }) + } + read() + + return [transformStream.readable, transformStream2.readable] +} + +export function pipeTo( + readable: ReadableStream, + writable: WritableStream, + options?: { preventClose: boolean } +) { + let resolver: () => void + const promise = new Promise((resolve) => (resolver = resolve)) + + const reader = readable.getReader() + const writer = writable.getWriter() + function process() { + reader.read().then(({ done, value }) => { + if (done) { + if (options?.preventClose) { + writer.releaseLock() + } else { + writer.close() + } + resolver() + } else { + writer.write(value) + process() + } + }) + } + process() + return promise +} + +export function pipeThrough( + readable: ReadableStream, + transformStream: TransformStream +) { + pipeTo(readable, transformStream.writable) + return transformStream.readable +} + +export function chainStreams( + streams: ReadableStream[] +): ReadableStream { + const { readable, writable } = new TransformStream() + + let promise = Promise.resolve() + for (let i = 0; i < streams.length; ++i) { + promise = promise.then(() => + pipeTo(streams[i], writable, { + preventClose: i + 1 < streams.length, + }) + ) + } + + return readable +} + +export function streamFromArray(strings: string[]): ReadableStream { + // Note: we use a TransformStream here instead of instantiating a ReadableStream + // because the built-in ReadableStream polyfill runs strings through TextEncoder. + const { readable, writable } = new TransformStream() + + const writer = writable.getWriter() + strings.forEach((str) => writer.write(encodeText(str))) + writer.close() + + return readable +} + +export async function streamToString( + stream: ReadableStream +): Promise { + const reader = stream.getReader() + let bufferedString = '' + + while (true) { + const { done, value } = await reader.read() + + if (done) { + return bufferedString + } + + bufferedString += decodeText(value) + } +} + +export function encodeText(input: string) { + return new TextEncoder().encode(input) +} + +export function decodeText(input?: Uint8Array) { + return new TextDecoder().decode(input) +} + +export function createTransformStream({ + flush, + transform, +}: { + flush?: ( + controller: TransformStreamDefaultController + ) => Promise | void + transform?: ( + chunk: Input, + controller: TransformStreamDefaultController + ) => Promise | void +}): TransformStream { + const source = new TransformStream() + const sink = new TransformStream() + const reader = source.readable.getReader() + const writer = sink.writable.getWriter() + + const controller = { + enqueue(chunk: Output) { + writer.write(chunk) + }, + + error(reason: Error) { + writer.abort(reason) + reader.cancel() + }, + + terminate() { + writer.close() + reader.cancel() + }, + + get desiredSize() { + return writer.desiredSize + }, + } + + ;(async () => { + try { + while (true) { + const { done, value } = await reader.read() + + if (done) { + const maybePromise = flush?.(controller) + if (maybePromise) { + await maybePromise + } + writer.close() + return + } + + if (transform) { + const maybePromise = transform(value, controller) + if (maybePromise) { + await maybePromise + } + } else { + controller.enqueue(value) + } + } + } catch (err) { + writer.abort(err) + } + })() + + return { + readable: sink.readable, + writable: source.writable, + } +} + +export function createBufferedTransformStream(): TransformStream< + Uint8Array, + Uint8Array +> { + let bufferedString = '' + let pendingFlush: Promise | null = null + + const flushBuffer = (controller: TransformStreamDefaultController) => { + if (!pendingFlush) { + pendingFlush = new Promise((resolve) => { + setTimeout(() => { + controller.enqueue(encodeText(bufferedString)) + bufferedString = '' + pendingFlush = null + resolve() + }, 0) + }) + } + return pendingFlush + } + + return createTransformStream({ + transform(chunk, controller) { + bufferedString += decodeText(chunk) + flushBuffer(controller) + }, + + flush() { + if (pendingFlush) { + return pendingFlush + } + }, + }) +} + +export function createFlushEffectStream( + handleFlushEffect: () => Promise +): TransformStream { + return createTransformStream({ + async transform(chunk, controller) { + const extraChunk = await handleFlushEffect() + // those should flush together at once + controller.enqueue(encodeText(extraChunk + decodeText(chunk))) + }, + }) +} + +export async function renderToStream({ + ReactDOMServer, + element, + suffix, + dataStream, + generateStaticHTML, + flushEffectHandler, +}: { + ReactDOMServer: typeof import('react-dom/server') + element: React.ReactElement + suffix?: string + dataStream?: ReadableStream + generateStaticHTML: boolean + flushEffectHandler?: () => Promise +}): Promise> { + const closeTag = '' + const suffixUnclosed = suffix ? suffix.split(closeTag)[0] : null + const renderStream: ReadableStream & { + allReady?: Promise + } = await (ReactDOMServer as any).renderToReadableStream(element) + + if (generateStaticHTML) { + await renderStream.allReady + } + + const transforms: Array> = [ + createBufferedTransformStream(), + flushEffectHandler ? createFlushEffectStream(flushEffectHandler) : null, + suffixUnclosed != null ? createPrefixStream(suffixUnclosed) : null, + dataStream ? createInlineDataStream(dataStream) : null, + suffixUnclosed != null ? createSuffixStream(closeTag) : null, + ].filter(Boolean) as any + + return transforms.reduce( + (readable, transform) => pipeThrough(readable, transform), + renderStream + ) +} + +export function createSuffixStream( + suffix: string +): TransformStream { + return createTransformStream({ + flush(controller) { + if (suffix) { + controller.enqueue(encodeText(suffix)) + } + }, + }) +} + +export function createPrefixStream( + prefix: string +): TransformStream { + let prefixFlushed = false + let prefixPrefixFlushFinished: Promise | null = null + return createTransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk) + if (!prefixFlushed && prefix) { + prefixFlushed = true + prefixPrefixFlushFinished = new Promise((res) => { + // NOTE: streaming flush + // Enqueue prefix part before the major chunks are enqueued so that + // prefix won't be flushed too early to interrupt the data stream + setTimeout(() => { + controller.enqueue(encodeText(prefix)) + res() + }) + }) + } + }, + flush(controller) { + if (prefixPrefixFlushFinished) return prefixPrefixFlushFinished + if (!prefixFlushed && prefix) { + prefixFlushed = true + controller.enqueue(encodeText(prefix)) + } + }, + }) +} + +export function createInlineDataStream( + dataStream: ReadableStream +): TransformStream { + let dataStreamFinished: Promise | null = null + return createTransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk) + + if (!dataStreamFinished) { + const dataStreamReader = dataStream.getReader() + + // NOTE: streaming flush + // We are buffering here for the inlined data stream because the + // "shell" stream might be chunkenized again by the underlying stream + // implementation, e.g. with a specific high-water mark. To ensure it's + // the safe timing to pipe the data stream, this extra tick is + // necessary. + dataStreamFinished = new Promise((res) => + setTimeout(async () => { + try { + while (true) { + const { done, value } = await dataStreamReader.read() + if (done) { + return res() + } + controller.enqueue(value) + } + } catch (err) { + controller.error(err) + } + res() + }, 0) + ) + } + }, + flush() { + if (dataStreamFinished) { + return dataStreamFinished + } + }, + }) +} diff --git a/packages/next/server/render.tsx b/packages/next/server/render.tsx index e80cf62991cc..c13d5ff65c12 100644 --- a/packages/next/server/render.tsx +++ b/packages/next/server/render.tsx @@ -64,7 +64,17 @@ import { } from '../lib/load-custom-routes' import RenderResult from './render-result' import isError from '../lib/is-error' -import { readableStreamTee } from './web/utils' +import { + readableStreamTee, + encodeText, + decodeText, + pipeThrough, + streamFromArray, + streamToString, + chainStreams, + createBufferedTransformStream, + renderToStream, +} from './node-web-streams-helper' import { ImageConfigContext } from '../shared/lib/image-config-context' import { FlushEffectsContext } from '../shared/lib/flush-effects' @@ -1637,335 +1647,3 @@ function serializeError( statusCode: 500, } } - -function createTransformStream({ - flush, - transform, -}: { - flush?: ( - controller: TransformStreamDefaultController - ) => Promise | void - transform?: ( - chunk: Input, - controller: TransformStreamDefaultController - ) => Promise | void -}): TransformStream { - const source = new TransformStream() - const sink = new TransformStream() - const reader = source.readable.getReader() - const writer = sink.writable.getWriter() - - const controller = { - enqueue(chunk: Output) { - writer.write(chunk) - }, - - error(reason: Error) { - writer.abort(reason) - reader.cancel() - }, - - terminate() { - writer.close() - reader.cancel() - }, - - get desiredSize() { - return writer.desiredSize - }, - } - - ;(async () => { - try { - while (true) { - const { done, value } = await reader.read() - - if (done) { - const maybePromise = flush?.(controller) - if (maybePromise) { - await maybePromise - } - writer.close() - return - } - - if (transform) { - const maybePromise = transform(value, controller) - if (maybePromise) { - await maybePromise - } - } else { - controller.enqueue(value) - } - } - } catch (err) { - writer.abort(err) - } - })() - - return { - readable: sink.readable, - writable: source.writable, - } -} - -function createBufferedTransformStream(): TransformStream< - Uint8Array, - Uint8Array -> { - let bufferedString = '' - let pendingFlush: Promise | null = null - - const flushBuffer = (controller: TransformStreamDefaultController) => { - if (!pendingFlush) { - pendingFlush = new Promise((resolve) => { - setTimeout(() => { - controller.enqueue(encodeText(bufferedString)) - bufferedString = '' - pendingFlush = null - resolve() - }, 0) - }) - } - return pendingFlush - } - - return createTransformStream({ - transform(chunk, controller) { - bufferedString += decodeText(chunk) - flushBuffer(controller) - }, - - flush() { - if (pendingFlush) { - return pendingFlush - } - }, - }) -} - -function createFlushEffectStream( - handleFlushEffect: () => Promise -): TransformStream { - return createTransformStream({ - async transform(chunk, controller) { - const extraChunk = await handleFlushEffect() - // those should flush together at once - controller.enqueue(encodeText(extraChunk + decodeText(chunk))) - }, - }) -} - -async function renderToStream({ - ReactDOMServer, - element, - suffix, - dataStream, - generateStaticHTML, - flushEffectHandler, -}: { - ReactDOMServer: typeof import('react-dom/server') - element: React.ReactElement - suffix?: string - dataStream?: ReadableStream - generateStaticHTML: boolean - flushEffectHandler?: () => Promise -}): Promise> { - const closeTag = '' - const suffixUnclosed = suffix ? suffix.split(closeTag)[0] : null - const renderStream: ReadableStream & { - allReady?: Promise - } = await (ReactDOMServer as any).renderToReadableStream(element) - - if (generateStaticHTML) { - await renderStream.allReady - } - - const transforms: Array> = [ - createBufferedTransformStream(), - flushEffectHandler ? createFlushEffectStream(flushEffectHandler) : null, - suffixUnclosed != null ? createPrefixStream(suffixUnclosed) : null, - dataStream ? createInlineDataStream(dataStream) : null, - suffixUnclosed != null ? createSuffixStream(closeTag) : null, - ].filter(Boolean) as any - - return transforms.reduce( - (readable, transform) => pipeThrough(readable, transform), - renderStream - ) -} - -function encodeText(input: string) { - return new TextEncoder().encode(input) -} - -function decodeText(input?: Uint8Array) { - return new TextDecoder().decode(input) -} - -function createSuffixStream( - suffix: string -): TransformStream { - return createTransformStream({ - flush(controller) { - if (suffix) { - controller.enqueue(encodeText(suffix)) - } - }, - }) -} - -function createPrefixStream( - prefix: string -): TransformStream { - let prefixFlushed = false - let prefixPrefixFlushFinished: Promise | null = null - return createTransformStream({ - transform(chunk, controller) { - controller.enqueue(chunk) - if (!prefixFlushed && prefix) { - prefixFlushed = true - prefixPrefixFlushFinished = new Promise((res) => { - // NOTE: streaming flush - // Enqueue prefix part before the major chunks are enqueued so that - // prefix won't be flushed too early to interrupt the data stream - setTimeout(() => { - controller.enqueue(encodeText(prefix)) - res() - }) - }) - } - }, - flush(controller) { - if (prefixPrefixFlushFinished) return prefixPrefixFlushFinished - if (!prefixFlushed && prefix) { - prefixFlushed = true - controller.enqueue(encodeText(prefix)) - } - }, - }) -} - -function createInlineDataStream( - dataStream: ReadableStream -): TransformStream { - let dataStreamFinished: Promise | null = null - return createTransformStream({ - transform(chunk, controller) { - controller.enqueue(chunk) - - if (!dataStreamFinished) { - const dataStreamReader = dataStream.getReader() - - // NOTE: streaming flush - // We are buffering here for the inlined data stream because the - // "shell" stream might be chunkenized again by the underlying stream - // implementation, e.g. with a specific high-water mark. To ensure it's - // the safe timing to pipe the data stream, this extra tick is - // necessary. - dataStreamFinished = new Promise((res) => - setTimeout(async () => { - try { - while (true) { - const { done, value } = await dataStreamReader.read() - if (done) { - return res() - } - controller.enqueue(value) - } - } catch (err) { - controller.error(err) - } - res() - }, 0) - ) - } - }, - flush() { - if (dataStreamFinished) { - return dataStreamFinished - } - }, - }) -} - -function pipeTo( - readable: ReadableStream, - writable: WritableStream, - options?: { preventClose: boolean } -) { - let resolver: () => void - const promise = new Promise((resolve) => (resolver = resolve)) - - const reader = readable.getReader() - const writer = writable.getWriter() - function process() { - reader.read().then(({ done, value }) => { - if (done) { - if (options?.preventClose) { - writer.releaseLock() - } else { - writer.close() - } - resolver() - } else { - writer.write(value) - process() - } - }) - } - process() - return promise -} - -function pipeThrough( - readable: ReadableStream, - transformStream: TransformStream -) { - pipeTo(readable, transformStream.writable) - return transformStream.readable -} - -function chainStreams(streams: ReadableStream[]): ReadableStream { - const { readable, writable } = new TransformStream() - - let promise = Promise.resolve() - for (let i = 0; i < streams.length; ++i) { - promise = promise.then(() => - pipeTo(streams[i], writable, { - preventClose: i + 1 < streams.length, - }) - ) - } - - return readable -} - -function streamFromArray(strings: string[]): ReadableStream { - // Note: we use a TransformStream here instead of instantiating a ReadableStream - // because the built-in ReadableStream polyfill runs strings through TextEncoder. - const { readable, writable } = new TransformStream() - - const writer = writable.getWriter() - strings.forEach((str) => writer.write(encodeText(str))) - writer.close() - - return readable -} - -async function streamToString( - stream: ReadableStream -): Promise { - const reader = stream.getReader() - let bufferedString = '' - - while (true) { - const { done, value } = await reader.read() - - if (done) { - return bufferedString - } - - bufferedString += decodeText(value) - } -} diff --git a/packages/next/server/web/utils.ts b/packages/next/server/web/utils.ts index a795ec35e144..864721bb3ab4 100644 --- a/packages/next/server/web/utils.ts +++ b/packages/next/server/web/utils.ts @@ -14,32 +14,6 @@ export async function* streamToIterator( reader.releaseLock() } -export function readableStreamTee( - readable: ReadableStream -): [ReadableStream, ReadableStream] { - const transformStream = new TransformStream() - const transformStream2 = new TransformStream() - const writer = transformStream.writable.getWriter() - const writer2 = transformStream2.writable.getWriter() - - const reader = readable.getReader() - function read() { - reader.read().then(({ done, value }) => { - if (done) { - writer.close() - writer2.close() - return - } - writer.write(value) - writer2.write(value) - read() - }) - } - read() - - return [transformStream.readable, transformStream2.readable] -} - export function notImplemented(name: string, method: string): any { throw new Error( `Failed to get the '${method}' property on '${name}': the property is not implemented` diff --git a/test/e2e/yarn-pnp/test/utils.ts b/test/e2e/yarn-pnp/test/utils.ts index 57ceb10c050e..976a339e5fb0 100644 --- a/test/e2e/yarn-pnp/test/utils.ts +++ b/test/e2e/yarn-pnp/test/utils.ts @@ -7,49 +7,54 @@ import { NextInstance } from 'test/lib/next-modes/base' jest.setTimeout(2 * 60 * 1000) export function runTests(example = '') { - let next: NextInstance + const versionParts = process.versions.node.split('.').map((i) => Number(i)) - beforeAll(async () => { - const srcDir = join(__dirname, '../../../../examples', example) - const srcFiles = await fs.readdir(srcDir) + if ( + versionParts[0] > 16 || + (versionParts[0] === 16 && versionParts[1] >= 14) + ) { + let next: NextInstance - const packageJson = await fs.readJson(join(srcDir, 'package.json')) + beforeAll(async () => { + const srcDir = join(__dirname, '../../../../examples', example) + const srcFiles = await fs.readdir(srcDir) - next = await createNext({ - files: srcFiles.reduce((prev, file) => { - if (file !== 'package.json') { - prev[file] = new FileRef(join(srcDir, file)) - } - return prev - }, {} as { [key: string]: FileRef }), - dependencies: { - ...packageJson.dependencies, - ...packageJson.devDependencies, - }, - installCommand: ({ dependencies }) => { - const pkgs = Object.keys(dependencies).reduce((prev, cur) => { - prev.push(`${cur}@${dependencies[cur]}`) + const packageJson = await fs.readJson(join(srcDir, 'package.json')) + + next = await createNext({ + files: srcFiles.reduce((prev, file) => { + if (file !== 'package.json') { + prev[file] = new FileRef(join(srcDir, file)) + } return prev - }, [] as string[]) - return `yarn set version 3.1.1 && yarn config set enableGlobalCache true && yarn config set compressionLevel 0 && yarn add ${pkgs.join( - ' ' - )}` - }, - buildCommand: `yarn next build --no-lint`, - startCommand: (global as any).isNextDev ? `yarn next` : `yarn next start`, + }, {} as { [key: string]: FileRef }), + dependencies: { + ...packageJson.dependencies, + ...packageJson.devDependencies, + }, + installCommand: ({ dependencies }) => { + const pkgs = Object.keys(dependencies).reduce((prev, cur) => { + prev.push(`${cur}@${dependencies[cur]}`) + return prev + }, [] as string[]) + return `yarn set version berry && yarn config set enableGlobalCache true && yarn config set compressionLevel 0 && yarn add ${pkgs.join( + ' ' + )}` + }, + buildCommand: `yarn next build --no-lint`, + startCommand: (global as any).isNextDev + ? `yarn next` + : `yarn next start`, + }) }) - }) - afterAll(() => next?.destroy()) - - it('should warn on not fully supported node versions', async () => { - expect(next.cliOutput).toContain( - 'Node.js 16.14+ is required for Yarn PnP 3.20+. More info' - ) - }) + afterAll(() => next?.destroy()) - it(`should compile and serve the index page correctly ${example}`, async () => { - const res = await fetchViaHTTP(next.url, '/') - expect(res.status).toBe(200) - expect(await res.text()).toContain(' { + const res = await fetchViaHTTP(next.url, '/') + expect(res.status).toBe(200) + expect(await res.text()).toContain(' {}) + } }