Skip to content

Commit

Permalink
[miniflare] fix: make sure the magic proxy can handle multiple parall…
Browse files Browse the repository at this point in the history
…el R2 stream reads (#5491)

---------

Co-authored-by: MrBBot <bcoll@cloudflare.com>
  • Loading branch information
dario-piotrowicz and mrbbot committed Apr 3, 2024
1 parent bd00dc8 commit 940ad89
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 8 deletions.
7 changes: 7 additions & 0 deletions .changeset/twelve-geckos-melt.md
@@ -0,0 +1,7 @@
---
"miniflare": patch
---

fix: make sure the magic proxy can handle multiple parallel r2 stream reads

Currently trying to read multiple R2 streams in parallel (via `Promise.all` for example) leads to deadlock which prevents any of the target streams from being read. This is caused by the underlying implementation only allowing a single HTTP connection to the Workers runtime at a time. This change fixes the issue by allowing multiple parallel HTTP connections.
14 changes: 7 additions & 7 deletions packages/miniflare/src/plugins/core/proxy/fetch-sync.ts
Expand Up @@ -39,23 +39,23 @@ const { notifyHandle, port, filename } = workerData;
// with "Error: Cannot find module 'undici'". Instead we need to create a
// 'require' using the '__filename' of the host... :(
const actualRequire = createRequire(filename);
const { Client, fetch } = actualRequire("undici");
const { Pool, fetch } = actualRequire("undici");
let clientUrl;
let client;
let dispatcherUrl;
let dispatcher;
port.addEventListener("message", async (event) => {
const { id, method, url, headers, body } = event.data;
if (clientUrl !== url) {
clientUrl = url;
client = new Client(url, {
if (dispatcherUrl !== url) {
dispatcherUrl = url;
dispatcher = new Pool(url, {
connect: { rejectUnauthorized: false },
});
}
headers["${CoreHeaders.OP_SYNC}"] = "true";
try {
// body cannot be a ReadableStream, so no need to specify duplex
const response = await fetch(url, { method, headers, body, dispatcher: client });
const response = await fetch(url, { method, headers, body, dispatcher });
const responseBody = response.headers.get("${CoreHeaders.OP_RESULT_TYPE}") === "ReadableStream"
? response.body
: await response.arrayBuffer();
Expand Down
51 changes: 50 additions & 1 deletion packages/miniflare/test/plugins/core/proxy/client.spec.ts
Expand Up @@ -2,7 +2,7 @@ import assert from "assert";
import { Blob } from "buffer";
import http from "http";
import { text } from "stream/consumers";
import { ReadableStream } from "stream/web";
import { ReadableStream, WritableStream } from "stream/web";
import util from "util";
import type { Fetcher } from "@cloudflare/workers-types/experimental";
import test, { ThrowsExpectation } from "ava";
Expand Down Expand Up @@ -244,6 +244,55 @@ test("ProxyClient: returns empty ReadableStream synchronously", async (t) => {
assert(objectBody != null);
t.is(await text(objectBody.body), ""); // Synchronous empty stream access
});
test("ProxyClient: returns multiple ReadableStreams in parallel", async (t) => {
const mf = new Miniflare({ script: nullScript, r2Buckets: ["BUCKET"] });
t.teardown(() => mf.dispose());

const logs: string[] = [];

const bucket = await mf.getR2Bucket("BUCKET");

const str = new Array(500000)
.fill(null)
.map(() => "test")
.join("");

const objectKeys = ["obj-1", "obj-2", "obj-3"];

for (const objectKey of objectKeys) {
await bucket.put(objectKey, str);
}

await Promise.all(
objectKeys.map((objectKey) =>
bucket.get(objectKey).then((obj) => readStream(objectKey, obj?.body))
)
);

async function readStream(objectKey: string, stream?: ReadableStream) {
logs.push(`[${objectKey}] stream start`);
if (!stream) return;
await stream.pipeTo(
new WritableStream({
write(_chunk) {
logs.push(`[${objectKey}] stream chunk`);
},
close() {
logs.push(`[${objectKey}] stream close`);
},
})
);
logs.push(`[${objectKey}] stream end`);
}

for (const objectKey of objectKeys) {
t.is(logs.includes(`[${objectKey}] stream start`), true);
t.is(logs.includes(`[${objectKey}] stream chunk`), true);
t.is(logs.includes(`[${objectKey}] stream close`), true);
t.is(logs.includes(`[${objectKey}] stream end`), true);
}
});

test("ProxyClient: can `JSON.stringify()` proxies", async (t) => {
const mf = new Miniflare({ script: nullScript, r2Buckets: ["BUCKET"] });
t.teardown(() => mf.dispose());
Expand Down

0 comments on commit 940ad89

Please sign in to comment.