Skip to content

Commit b4c68ec

Browse files
authoredApr 28, 2024··
ai/rsc: ReadableStream as provider for streamable value; add .append() method (#1460)
1 parent 400b2b1 commit b4c68ec

File tree

3 files changed

+416
-2
lines changed

3 files changed

+416
-2
lines changed
 

‎.changeset/clean-planes-reflect.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
ai/rsc: ReadableStream as provider for createStreamableValue; add .append() method

‎packages/core/rsc/shared-client/streamable.ui.test.tsx

+277
Original file line numberDiff line numberDiff line change
@@ -228,5 +228,282 @@ describe('rsc - readStreamableValue()', () => {
228228
]
229229
`);
230230
});
231+
232+
it('should be able to call .append() to send patches', async () => {
233+
const streamable = createStreamableValue();
234+
const value = streamable.value;
235+
236+
streamable.append('hello');
237+
streamable.append(' world');
238+
streamable.append('!');
239+
streamable.done();
240+
241+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
242+
[
243+
{
244+
"curr": undefined,
245+
"type": Symbol(ui.streamable.value),
246+
},
247+
{
248+
"curr": "hello",
249+
},
250+
{
251+
"diff": [
252+
0,
253+
" world",
254+
],
255+
},
256+
{
257+
"diff": [
258+
0,
259+
"!",
260+
],
261+
},
262+
{},
263+
]
264+
`);
265+
266+
const values = [];
267+
for await (const v of readStreamableValue(value)) {
268+
values.push(v);
269+
}
270+
expect(values).toMatchInlineSnapshot(`
271+
[
272+
"hello",
273+
"hello world",
274+
"hello world!",
275+
]
276+
`);
277+
});
278+
279+
it('should be able to mix .update() and .append() with optimized payloads', async () => {
280+
const streamable = createStreamableValue('hello');
281+
const value = streamable.value;
282+
283+
streamable.append(' world');
284+
streamable.update('hello world!!');
285+
streamable.update('some new');
286+
streamable.update('some new string');
287+
streamable.append(' with patch!');
288+
streamable.done();
289+
290+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
291+
[
292+
{
293+
"curr": "hello",
294+
"type": Symbol(ui.streamable.value),
295+
},
296+
{
297+
"diff": [
298+
0,
299+
" world",
300+
],
301+
},
302+
{
303+
"diff": [
304+
0,
305+
"!!",
306+
],
307+
},
308+
{
309+
"curr": "some new",
310+
},
311+
{
312+
"diff": [
313+
0,
314+
" string",
315+
],
316+
},
317+
{
318+
"diff": [
319+
0,
320+
" with patch!",
321+
],
322+
},
323+
{},
324+
]
325+
`);
326+
327+
const values = [];
328+
for await (const v of readStreamableValue(value)) {
329+
values.push(v);
330+
}
331+
expect(values).toMatchInlineSnapshot(`
332+
[
333+
"hello",
334+
"hello world",
335+
"hello world!!",
336+
"some new",
337+
"some new string",
338+
"some new string with patch!",
339+
]
340+
`);
341+
});
342+
343+
it('should behave like .update() with .append() and .done()', async () => {
344+
const streamable = createStreamableValue('hello');
345+
const value = streamable.value;
346+
347+
streamable.append(' world');
348+
streamable.done('fin');
349+
350+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
351+
[
352+
{
353+
"curr": "hello",
354+
"type": Symbol(ui.streamable.value),
355+
},
356+
{
357+
"diff": [
358+
0,
359+
" world",
360+
],
361+
},
362+
{
363+
"curr": "fin",
364+
},
365+
]
366+
`);
367+
368+
const values = [];
369+
for await (const v of readStreamableValue(value)) {
370+
values.push(v);
371+
}
372+
expect(values).toMatchInlineSnapshot(`
373+
[
374+
"hello",
375+
"hello world",
376+
"fin",
377+
]
378+
`);
379+
});
380+
});
381+
382+
describe('readableStream', () => {
383+
it('should be able to accept readableStream as the source', async () => {
384+
const streamable = createStreamableValue(
385+
new ReadableStream({
386+
start(controller) {
387+
controller.enqueue('hello');
388+
controller.enqueue(' world');
389+
controller.enqueue('!');
390+
controller.close();
391+
},
392+
}),
393+
);
394+
const value = streamable.value;
395+
396+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
397+
[
398+
{
399+
"curr": undefined,
400+
"type": Symbol(ui.streamable.value),
401+
},
402+
{
403+
"curr": "hello",
404+
},
405+
{
406+
"diff": [
407+
0,
408+
" world",
409+
],
410+
},
411+
{
412+
"diff": [
413+
0,
414+
"!",
415+
],
416+
},
417+
{},
418+
]
419+
`);
420+
421+
const values = [];
422+
for await (const v of readStreamableValue(value)) {
423+
values.push(v);
424+
}
425+
expect(values).toMatchInlineSnapshot(`
426+
[
427+
"hello",
428+
"hello world",
429+
"hello world!",
430+
]
431+
`);
432+
});
433+
434+
it('should accept readableStream with JSON payloads', async () => {
435+
const streamable = createStreamableValue(
436+
new ReadableStream({
437+
start(controller) {
438+
controller.enqueue({ v: 1 });
439+
controller.enqueue({ v: 2 });
440+
controller.enqueue({ v: 3 });
441+
controller.close();
442+
},
443+
}),
444+
);
445+
const value = streamable.value;
446+
447+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
448+
[
449+
{
450+
"curr": undefined,
451+
"type": Symbol(ui.streamable.value),
452+
},
453+
{
454+
"curr": {
455+
"v": 1,
456+
},
457+
},
458+
{
459+
"curr": {
460+
"v": 2,
461+
},
462+
},
463+
{
464+
"curr": {
465+
"v": 3,
466+
},
467+
},
468+
{},
469+
]
470+
`);
471+
472+
const values = [];
473+
for await (const v of readStreamableValue(value)) {
474+
values.push(v);
475+
}
476+
expect(values).toMatchInlineSnapshot(`
477+
[
478+
{
479+
"v": 1,
480+
},
481+
{
482+
"v": 2,
483+
},
484+
{
485+
"v": 3,
486+
},
487+
]
488+
`);
489+
});
490+
491+
it('should lock the streamable if from readableStream', async () => {
492+
const streamable = createStreamableValue(
493+
new ReadableStream({
494+
async start(controller) {
495+
await nextTick();
496+
controller.enqueue('hello');
497+
controller.close();
498+
},
499+
}),
500+
);
501+
502+
expect(() =>
503+
streamable.update('world'),
504+
).toThrowErrorMatchingInlineSnapshot(
505+
'".update(): Value stream is locked and cannot be updated."',
506+
);
507+
});
231508
});
232509
});

‎packages/core/rsc/streamable.tsx

+134-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import type { StreamablePatch, StreamableValue } from './types';
2121
* Create a piece of changable UI that can be streamed to the client.
2222
* On the client side, it can be rendered as a normal React node.
2323
*/
24-
export function createStreamableUI(initialValue?: React.ReactNode) {
24+
function createStreamableUI(initialValue?: React.ReactNode) {
2525
let currentValue = initialValue;
2626
let closed = false;
2727
let { row, resolve, reject } = createSuspensedChunk(initialValue);
@@ -136,12 +136,72 @@ export function createStreamableUI(initialValue?: React.ReactNode) {
136136
};
137137
}
138138

139+
const STREAMABLE_VALUE_INTERNAL_LOCK = Symbol('streamable.value.lock');
140+
139141
/**
140142
* Create a wrapped, changable value that can be streamed to the client.
141143
* On the client side, the value can be accessed via the readStreamableValue() API.
142144
*/
143-
export function createStreamableValue<T = any, E = any>(initialValue?: T) {
145+
function createStreamableValue<T = any, E = any>(
146+
initialValue?: T | ReadableStream<T>,
147+
) {
148+
const isReadableStream =
149+
initialValue instanceof ReadableStream ||
150+
(typeof initialValue === 'object' &&
151+
initialValue !== null &&
152+
'getReader' in initialValue &&
153+
typeof initialValue.getReader === 'function' &&
154+
'locked' in initialValue &&
155+
typeof initialValue.locked === 'boolean');
156+
157+
if (!isReadableStream) {
158+
return createStreamableValueImpl<T, E>(initialValue);
159+
}
160+
161+
const streamableValue = createStreamableValueImpl<T, E>();
162+
163+
// Since the streamable value will be from a readable stream, it's not allowed
164+
// to update the value manually as that introduces race conditions and
165+
// unexpected behavior.
166+
// We lock the value to prevent any updates from the user.
167+
streamableValue[STREAMABLE_VALUE_INTERNAL_LOCK] = true;
168+
169+
(async () => {
170+
try {
171+
// Consume the readable stream and update the value.
172+
const reader = initialValue.getReader();
173+
174+
while (true) {
175+
const { value, done } = await reader.read();
176+
if (done) {
177+
break;
178+
}
179+
180+
// Unlock the value to allow updates.
181+
streamableValue[STREAMABLE_VALUE_INTERNAL_LOCK] = false;
182+
if (typeof value === 'string') {
183+
streamableValue.append(value);
184+
} else {
185+
streamableValue.update(value);
186+
}
187+
// Lock the value again.
188+
streamableValue[STREAMABLE_VALUE_INTERNAL_LOCK] = true;
189+
}
190+
191+
streamableValue[STREAMABLE_VALUE_INTERNAL_LOCK] = false;
192+
streamableValue.done();
193+
} catch (e) {
194+
streamableValue[STREAMABLE_VALUE_INTERNAL_LOCK] = false;
195+
streamableValue.error(e);
196+
}
197+
})();
198+
199+
return streamableValue;
200+
}
201+
202+
function createStreamableValueImpl<T = any, E = any>(initialValue?: T) {
144203
let closed = false;
204+
let locked = false;
145205
let resolvable = createResolvablePromise<StreamableValue<T, E>>();
146206

147207
let currentValue = initialValue;
@@ -154,6 +214,11 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
154214
if (closed) {
155215
throw new Error(method + ': Value stream is already closed.');
156216
}
217+
if (locked) {
218+
throw new Error(
219+
method + ': Value stream is locked and cannot be updated.',
220+
);
221+
}
157222
}
158223

159224
let warningTimeout: NodeJS.Timeout | undefined;
@@ -212,6 +277,13 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
212277
}
213278

214279
return {
280+
/**
281+
* @internal This is an internal lock to prevent the value from being
282+
* updated by the user.
283+
*/
284+
set [STREAMABLE_VALUE_INTERNAL_LOCK](state: boolean) {
285+
locked = state;
286+
},
215287
/**
216288
* The value of the streamable. This can be returned from a Server Action and
217289
* received by the client. To read the streamed values, use the
@@ -235,6 +307,56 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
235307

236308
warnUnclosedStream();
237309
},
310+
/**
311+
* This method is used to append a delta string to the current value. It
312+
* requires the current value of the streamable to be a string.
313+
*
314+
* @example
315+
* ```jsx
316+
* const streamable = createStreamableValue('hello');
317+
* streamable.append(' world');
318+
*
319+
* // The value will be 'hello world'
320+
* ```
321+
*/
322+
append(value: T) {
323+
assertStream('.append()');
324+
325+
if (
326+
typeof currentValue !== 'string' &&
327+
typeof currentValue !== 'undefined'
328+
) {
329+
throw new Error(
330+
`.append(): The current value is not a string. Received: ${typeof currentValue}`,
331+
);
332+
}
333+
if (typeof value !== 'string') {
334+
throw new Error(
335+
`.append(): The value is not a string. Received: ${typeof value}`,
336+
);
337+
}
338+
339+
const resolvePrevious = resolvable.resolve;
340+
resolvable = createResolvablePromise();
341+
342+
if (typeof currentValue === 'string') {
343+
currentPatchValue = [0, value];
344+
(currentValue as string) = currentValue + value;
345+
} else {
346+
currentPatchValue = undefined;
347+
currentValue = value;
348+
}
349+
350+
currentPromise = resolvable.promise;
351+
resolvePrevious(createWrapped());
352+
353+
warnUnclosedStream();
354+
},
355+
/**
356+
* This method is used to signal that there is an error in the value stream.
357+
* It will be thrown on the client side when consumed via
358+
* `readStreamableValue` or `useStreamableValue`.
359+
*/
238360
error(error: any) {
239361
assertStream('.error()');
240362

@@ -247,6 +369,14 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
247369

248370
resolvable.resolve({ error });
249371
},
372+
/**
373+
* This method marks the value as finalized. You can either call it without
374+
* any parameters or with a new value as the final state.
375+
* Once called, the value cannot be updated or appended anymore.
376+
*
377+
* This method is always **required** to be called, otherwise the response
378+
* will be stuck in a loading state.
379+
*/
250380
done(...args: [] | [T]) {
251381
assertStream('.done()');
252382

@@ -267,6 +397,8 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
267397
};
268398
}
269399

400+
export { createStreamableUI, createStreamableValue };
401+
270402
type Streamable = ReactNode | Promise<ReactNode>;
271403
type Renderer<T> = (
272404
props: T,

0 commit comments

Comments
 (0)
Please sign in to comment.