Skip to content

Commit 20007b9

Browse files
authoredMar 20, 2024··
feat(ai/rsc): Patchable string values (#1190)
1 parent fac9a6e commit 20007b9

File tree

5 files changed

+174
-12
lines changed

5 files changed

+174
-12
lines changed
 

‎.changeset/lemon-beans-bathe.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
feat(ai/rsc): support string diff and patch in streamable value

‎packages/core/rsc/shared-client/streamable.tsx

+26-2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ export function readStreamableValue<T = unknown>(
5757
streamableValue;
5858
let curr = row.curr;
5959
let done = false;
60+
let initial = true;
6061

6162
return {
6263
async next() {
@@ -67,8 +68,22 @@ export function readStreamableValue<T = unknown>(
6768
if (typeof row.error !== 'undefined') {
6869
throw row.error;
6970
}
70-
if ('curr' in row) {
71-
curr = row.curr;
71+
if ('curr' in row || row.diff) {
72+
if (row.diff) {
73+
switch (row.diff[0]) {
74+
case 0:
75+
if (typeof curr !== 'string') {
76+
throw new Error(
77+
'Invalid patch: can only append to string types. This is a bug in the AI SDK.',
78+
);
79+
} else {
80+
(curr as string) = curr + row.diff[1];
81+
}
82+
break;
83+
}
84+
} else {
85+
curr = row.curr;
86+
}
7287

7388
// The last emitted { done: true } won't be used as the value
7489
// by the for await...of syntax.
@@ -89,6 +104,15 @@ export function readStreamableValue<T = unknown>(
89104
}
90105

91106
row = row.next;
107+
if (initial) {
108+
initial = false;
109+
if (typeof curr === 'undefined') {
110+
// This is the initial chunk and there isn't an initial value yet.
111+
// Let's skip this one.
112+
return this.next();
113+
}
114+
}
115+
92116
return {
93117
value: curr,
94118
done: false,

‎packages/core/rsc/shared-client/streamable.ui.test.tsx

+69
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@ function nextTick() {
55
return Promise.resolve();
66
}
77

8+
async function getRawChunks(s: any) {
9+
const { next, ...otherFields } = s;
10+
const chunks = [otherFields];
11+
if (next) {
12+
chunks.push(...(await getRawChunks(await next)));
13+
}
14+
return chunks;
15+
}
16+
817
describe('rsc - readStreamableValue()', () => {
918
it('should return an async iterable', () => {
1019
const streamable = createStreamableValue();
@@ -160,4 +169,64 @@ describe('rsc - readStreamableValue()', () => {
160169
]
161170
`);
162171
});
172+
173+
describe('patch', () => {
174+
it('should be able to append strings as patch', async () => {
175+
const streamable = createStreamableValue();
176+
const value = streamable.value;
177+
178+
streamable.update('hello');
179+
streamable.update('hello world');
180+
streamable.update('hello world!');
181+
streamable.update('new string');
182+
streamable.done('new string with patch!');
183+
184+
expect(await getRawChunks(value)).toMatchInlineSnapshot(`
185+
[
186+
{
187+
"curr": undefined,
188+
"type": Symbol(ui.streamable.value),
189+
},
190+
{
191+
"curr": "hello",
192+
},
193+
{
194+
"diff": [
195+
0,
196+
" world",
197+
],
198+
},
199+
{
200+
"diff": [
201+
0,
202+
"!",
203+
],
204+
},
205+
{
206+
"curr": "new string",
207+
},
208+
{
209+
"diff": [
210+
0,
211+
" with patch!",
212+
],
213+
},
214+
]
215+
`);
216+
217+
const values = [];
218+
for await (const v of readStreamableValue(value)) {
219+
values.push(v);
220+
}
221+
expect(values).toMatchInlineSnapshot(`
222+
[
223+
"hello",
224+
"hello world",
225+
"hello world!",
226+
"new string",
227+
"new string with patch!",
228+
]
229+
`);
230+
});
231+
});
163232
});

‎packages/core/rsc/streamable.tsx

+71-10
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
createSuspensedChunk,
1616
consumeStream,
1717
} from './utils';
18-
import type { StreamableValue } from './types';
18+
import type { StreamablePatch, StreamableValue } from './types';
1919

2020
/**
2121
* Create a piece of changable UI that can be streamed to the client.
@@ -48,7 +48,13 @@ export function createStreamableUI(initialValue?: React.ReactNode) {
4848
warnUnclosedStream();
4949

5050
return {
51+
/**
52+
* The value of the streamable UI. This can be returned from a Server Action and received by the client.
53+
*/
5154
value: row,
55+
/**
56+
* This method updates the current UI node. It takes a new UI node and replaces the old one.
57+
*/
5258
update(value: React.ReactNode) {
5359
assertStream('.update()');
5460

@@ -67,6 +73,22 @@ export function createStreamableUI(initialValue?: React.ReactNode) {
6773

6874
warnUnclosedStream();
6975
},
76+
/**
77+
* This method is used to append a new UI node to the end of the old one.
78+
* Once appended a new UI node, the previous UI node cannot be updated anymore.
79+
*
80+
* @example
81+
* ```jsx
82+
* const ui = createStreamableUI(<div>hello</div>)
83+
* ui.append(<div>world</div>)
84+
*
85+
* // The UI node will be:
86+
* // <>
87+
* // <div>hello</div>
88+
* // <div>world</div>
89+
* // </>
90+
* ```
91+
*/
7092
append(value: React.ReactNode) {
7193
assertStream('.append()');
7294

@@ -79,6 +101,10 @@ export function createStreamableUI(initialValue?: React.ReactNode) {
79101

80102
warnUnclosedStream();
81103
},
104+
/**
105+
* This method is used to signal that there is an error in the UI stream.
106+
* It will be thrown on the client side and caught by the nearest error boundary component.
107+
*/
82108
error(error: any) {
83109
assertStream('.error()');
84110

@@ -88,6 +114,12 @@ export function createStreamableUI(initialValue?: React.ReactNode) {
88114
closed = true;
89115
reject(error);
90116
},
117+
/**
118+
* This method marks the UI node as finalized. You can either call it without any parameters or with a new UI node as the final state.
119+
* Once called, the UI node cannot be updated or appended anymore.
120+
*
121+
* This method is always **required** to be called, otherwise the response will be stuck in a loading state.
122+
*/
91123
done(...args: [] | [React.ReactNode]) {
92124
assertStream('.done()');
93125

@@ -116,6 +148,7 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
116148
let currentError: E | undefined;
117149
let currentPromise: typeof resolvable.promise | undefined =
118150
resolvable.promise;
151+
let currentPatchValue: StreamablePatch;
119152

120153
function assertStream(method: string) {
121154
if (closed) {
@@ -138,35 +171,63 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
138171
}
139172
warnUnclosedStream();
140173

141-
function createWrapped(withType?: boolean): StreamableValue<T, E> {
174+
function createWrapped(initialChunk?: boolean): StreamableValue<T, E> {
142175
// This makes the payload much smaller if there're mutative updates before the first read.
143-
const init: Partial<StreamableValue<T, E>> =
144-
currentError === undefined
145-
? { curr: currentValue }
146-
: { error: currentError };
176+
let init: Partial<StreamableValue<T, E>>;
177+
178+
if (currentError !== undefined) {
179+
init = { error: currentError };
180+
} else {
181+
if (currentPatchValue && !initialChunk) {
182+
init = { diff: currentPatchValue };
183+
} else {
184+
init = { curr: currentValue };
185+
}
186+
}
147187

148188
if (currentPromise) {
149189
init.next = currentPromise;
150190
}
151191

152-
if (withType) {
192+
if (initialChunk) {
153193
init.type = STREAMABLE_VALUE_TYPE;
154194
}
155195

156196
return init;
157197
}
158198

199+
// Update the internal `currentValue` and `currentPatchValue` if needed.
200+
function updateValueStates(value: T) {
201+
// If we can only send a patch over the wire, it's better to do so.
202+
currentPatchValue = undefined;
203+
if (typeof value === 'string') {
204+
if (typeof currentValue === 'string') {
205+
if (value.startsWith(currentValue)) {
206+
currentPatchValue = [0, value.slice(currentValue.length)];
207+
}
208+
}
209+
}
210+
211+
currentValue = value;
212+
}
213+
159214
return {
215+
/**
216+
* The value of the streamable. This can be returned from a Server Action and received by the client.
217+
*/
160218
get value() {
161219
return createWrapped(true);
162220
},
221+
/**
222+
* This method updates the current value with a new one.
223+
*/
163224
update(value: T) {
164225
assertStream('.update()');
165226

166227
const resolvePrevious = resolvable.resolve;
167228
resolvable = createResolvablePromise();
168229

169-
currentValue = value;
230+
updateValueStates(value);
170231
currentPromise = resolvable.promise;
171232
resolvePrevious(createWrapped());
172233

@@ -194,8 +255,8 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
194255
currentPromise = undefined;
195256

196257
if (args.length) {
197-
currentValue = args[0];
198-
resolvable.resolve({ curr: args[0] });
258+
updateValueStates(args[0]);
259+
resolvable.resolve(createWrapped());
199260
return;
200261
}
201262

‎packages/core/rsc/types.ts

+3
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,12 @@ export type MutableAIState<AIState> = {
8787
done: ((newState: AIState) => void) | (() => void);
8888
};
8989

90+
export type StreamablePatch = undefined | [0, string]; // Append string.
91+
9092
export type StreamableValue<T = any, E = any> = {
9193
type?: typeof STREAMABLE_VALUE_TYPE;
9294
curr?: T;
9395
error?: E;
96+
diff?: StreamablePatch;
9497
next?: Promise<StreamableValue<T, E>>;
9598
};

0 commit comments

Comments
 (0)
Please sign in to comment.