Skip to content

Commit

Permalink
Encode binary streams as a single collapsed Blob
Browse files Browse the repository at this point in the history
Since we buffer anyway, we might as well combine the chunks of binary
streams.

Ideally, we'd be able to just use the stream from the Blob but Node.js
doesn't return byob streams from Blob. Additionally, we don't actually
stream the content of Blobs due to the layering with busboy atm. We could
do that for binary streams in particular by replacing the File layering
with a stream.
  • Loading branch information
sebmarkbage committed May 3, 2024
1 parent ec9400d commit f6b75aa
Showing 1 changed file with 49 additions and 16 deletions.
65 changes: 49 additions & 16 deletions packages/react-client/src/ReactFlightReplyClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ export function processReply(
return '$' + tag + blobId.toString(16);
}

function serializeReadableStream(stream: ReadableStream): string {
function serializeBinaryReader(reader: any): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
Expand All @@ -218,23 +218,43 @@ export function processReply(
pendingParts++;
const streamId = nextPartId++;

// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
// receiving side. It also implies that different chunks can be split up or merged as opposed
// to a readable stream that happens to have Uint8Array as the type which might expect it to be
// received in the same slices.
// $FlowFixMe: This is a Node.js extension.
let supportsBYOB: void | boolean = stream.supportsBYOB;
if (supportsBYOB === undefined) {
try {
// $FlowFixMe[extra-arg]: This argument is accepted.
stream.getReader({mode: 'byob'}).releaseLock();
supportsBYOB = true;
} catch (x) {
supportsBYOB = false;
const buffer = [];

function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
if (entry.done) {
const blobId = nextPartId++;
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + blobId, new Blob(buffer));
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(
formFieldPrefix + streamId,
'"$o' + blobId.toString(16) + '"',
);
// eslint-disable-next-line react-internal/safe-string-coercion
data.append(formFieldPrefix + streamId, 'C'); // Close signal
pendingParts--;
if (pendingParts === 0) {
resolve(data);
}
} else {
buffer.push(entry.value);
reader.read(new Uint8Array(1024)).then(progress, reject);
}
}
reader.read(new Uint8Array(1024)).then(progress, reject);

const reader = stream.getReader();
return '$r' + streamId.toString(16);
}

function serializeReader(reader: ReadableStreamReader): string {
if (formData === null) {
// Upgrade to use FormData to allow us to stream this value.
formData = new FormData();
}
const data = formData;

pendingParts++;
const streamId = nextPartId++;

function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
if (entry.done) {
Expand All @@ -258,7 +278,20 @@ export function processReply(
}
reader.read().then(progress, reject);

return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16);
return '$R' + streamId.toString(16);
}

function serializeReadableStream(stream: ReadableStream): string {
// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
// receiving side. For binary streams, we serialize them as plain Blobs.
let binaryReader;
try {
// $FlowFixMe[extra-arg]: This argument is accepted.
binaryReader = stream.getReader({mode: 'byob'});
} catch (x) {
return serializeReader(stream.getReader());
}
return serializeBinaryReader(binaryReader);
}

function serializeAsyncIterable(
Expand Down

0 comments on commit f6b75aa

Please sign in to comment.