diff --git a/.changeset/twelve-geckos-melt.md b/.changeset/twelve-geckos-melt.md new file mode 100644 index 00000000000..1a59b86f996 --- /dev/null +++ b/.changeset/twelve-geckos-melt.md @@ -0,0 +1,7 @@ +--- +"miniflare": patch +--- + +fix: make sure the magic proxy can handle multiple parallel r2 stream reads + +Currently trying to read multiple R2 streams in parallel (via `Promise.all` for example) leads to deadlock which prevents any of the target streams from being read. This is caused by the underlying implementation only allowing a single HTTP connection to the Workers runtime at a time. This change fixes the issue by allowing multiple parallel HTTP connections. diff --git a/packages/miniflare/src/plugins/core/proxy/fetch-sync.ts b/packages/miniflare/src/plugins/core/proxy/fetch-sync.ts index b017a7a40b0..937ad42b223 100644 --- a/packages/miniflare/src/plugins/core/proxy/fetch-sync.ts +++ b/packages/miniflare/src/plugins/core/proxy/fetch-sync.ts @@ -39,23 +39,23 @@ const { notifyHandle, port, filename } = workerData; // with "Error: Cannot find module 'undici'". Instead we need to create a // 'require' using the '__filename' of the host... :( const actualRequire = createRequire(filename); -const { Client, fetch } = actualRequire("undici"); +const { Pool, fetch } = actualRequire("undici"); -let clientUrl; -let client; +let dispatcherUrl; +let dispatcher; port.addEventListener("message", async (event) => { const { id, method, url, headers, body } = event.data; - if (clientUrl !== url) { - clientUrl = url; - client = new Client(url, { + if (dispatcherUrl !== url) { + dispatcherUrl = url; + dispatcher = new Pool(url, { connect: { rejectUnauthorized: false }, }); } headers["${CoreHeaders.OP_SYNC}"] = "true"; try { // body cannot be a ReadableStream, so no need to specify duplex - const response = await fetch(url, { method, headers, body, dispatcher: client }); + const response = await fetch(url, { method, headers, body, dispatcher }); const responseBody = response.headers.get("${CoreHeaders.OP_RESULT_TYPE}") === "ReadableStream" ? response.body : await response.arrayBuffer(); diff --git a/packages/miniflare/test/plugins/core/proxy/client.spec.ts b/packages/miniflare/test/plugins/core/proxy/client.spec.ts index 381fe3e0a83..c868d618d3d 100644 --- a/packages/miniflare/test/plugins/core/proxy/client.spec.ts +++ b/packages/miniflare/test/plugins/core/proxy/client.spec.ts @@ -2,7 +2,7 @@ import assert from "assert"; import { Blob } from "buffer"; import http from "http"; import { text } from "stream/consumers"; -import { ReadableStream } from "stream/web"; +import { ReadableStream, WritableStream } from "stream/web"; import util from "util"; import type { Fetcher } from "@cloudflare/workers-types/experimental"; import test, { ThrowsExpectation } from "ava"; @@ -244,6 +244,55 @@ test("ProxyClient: returns empty ReadableStream synchronously", async (t) => { assert(objectBody != null); t.is(await text(objectBody.body), ""); // Synchronous empty stream access }); +test("ProxyClient: returns multiple ReadableStreams in parallel", async (t) => { + const mf = new Miniflare({ script: nullScript, r2Buckets: ["BUCKET"] }); + t.teardown(() => mf.dispose()); + + const logs: string[] = []; + + const bucket = await mf.getR2Bucket("BUCKET"); + + const str = new Array(500000) + .fill(null) + .map(() => "test") + .join(""); + + const objectKeys = ["obj-1", "obj-2", "obj-3"]; + + for (const objectKey of objectKeys) { + await bucket.put(objectKey, str); + } + + await Promise.all( + objectKeys.map((objectKey) => + bucket.get(objectKey).then((obj) => readStream(objectKey, obj?.body)) + ) + ); + + async function readStream(objectKey: string, stream?: ReadableStream) { + logs.push(`[${objectKey}] stream start`); + if (!stream) return; + await stream.pipeTo( + new WritableStream({ + write(_chunk) { + logs.push(`[${objectKey}] stream chunk`); + }, + close() { + logs.push(`[${objectKey}] stream close`); + }, + }) + ); + logs.push(`[${objectKey}] stream end`); + } + + for (const objectKey of objectKeys) { + t.is(logs.includes(`[${objectKey}] stream start`), true); + t.is(logs.includes(`[${objectKey}] stream chunk`), true); + t.is(logs.includes(`[${objectKey}] stream close`), true); + t.is(logs.includes(`[${objectKey}] stream end`), true); + } +}); + test("ProxyClient: can `JSON.stringify()` proxies", async (t) => { const mf = new Miniflare({ script: nullScript, r2Buckets: ["BUCKET"] }); t.teardown(() => mf.dispose());