Skip to content

Commit a94aab2

Browse files
shudingMaxLeiter
andauthoredMar 9, 2024··
fix(ai/rsc): Optimize streamable value (#1130)
Co-authored-by: Max Leiter <max.leiter@vercel.com>
1 parent eccbec9 commit a94aab2

File tree

3 files changed

+114
-32
lines changed

3 files changed

+114
-32
lines changed
 

‎.changeset/large-rivers-tease.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
ai/rsc: optimize streamable value stream size

‎packages/core/rsc/shared-client/streamable.ui.test.tsx

+73-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { createStreamableValue } from '../streamable';
22
import { readStreamableValue } from './streamable';
33

4+
function nextTick() {
5+
return Promise.resolve();
6+
}
7+
48
describe('rsc - readStreamableValue()', () => {
59
it('should return an async iterable', () => {
610
const streamable = createStreamableValue();
@@ -11,20 +15,81 @@ describe('rsc - readStreamableValue()', () => {
1115
expect(result[Symbol.asyncIterator]).toBeDefined();
1216
});
1317

14-
it('should be able to read all values', async () => {
15-
const streamable = createStreamableValue(0);
18+
it('should directly emit the final value when reading .value', async () => {
19+
const streamable = createStreamableValue('1');
20+
streamable.update('2');
21+
streamable.update('3');
22+
23+
expect(streamable.value).toMatchInlineSnapshot(`
24+
{
25+
"curr": "3",
26+
"next": Promise {},
27+
"type": Symbol(ui.streamable.value),
28+
}
29+
`);
30+
31+
streamable.done('4');
32+
33+
expect(streamable.value).toMatchInlineSnapshot(`
34+
{
35+
"curr": "4",
36+
"type": Symbol(ui.streamable.value),
37+
}
38+
`);
39+
});
40+
41+
it('should be able to stream any JSON values', async () => {
42+
const streamable = createStreamableValue();
43+
streamable.update({ v: 123 });
44+
45+
expect(streamable.value).toMatchInlineSnapshot(`
46+
{
47+
"curr": {
48+
"v": 123,
49+
},
50+
"next": Promise {},
51+
"type": Symbol(ui.streamable.value),
52+
}
53+
`);
54+
55+
streamable.done();
56+
});
1657

17-
streamable.update(1);
18-
streamable.update(2);
19-
streamable.done(3);
58+
it('should support .error()', async () => {
59+
const streamable = createStreamableValue();
60+
streamable.error('This is an error');
61+
62+
expect(streamable.value).toMatchInlineSnapshot(`
63+
{
64+
"error": "This is an error",
65+
"type": Symbol(ui.streamable.value),
66+
}
67+
`);
68+
});
69+
70+
it('should support reading streamed values and errors', async () => {
71+
const streamable = createStreamableValue(1);
72+
(async () => {
73+
await nextTick();
74+
streamable.update(2);
75+
await nextTick();
76+
streamable.update(3);
77+
await nextTick();
78+
streamable.error('This is an error');
79+
})();
2080

2181
const values = [];
22-
for await (const v of readStreamableValue(streamable.value)) {
23-
values.push(v);
82+
83+
try {
84+
for await (const v of readStreamableValue(streamable.value)) {
85+
values.push(v);
86+
}
87+
} catch (e) {
88+
expect(e).toMatchInlineSnapshot(`"This is an error"`);
2489
}
90+
2591
expect(values).toMatchInlineSnapshot(`
2692
[
27-
0,
2893
1,
2994
2,
3095
3,

‎packages/core/rsc/streamable.tsx

+36-24
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,12 @@ export function createStreamableUI(initialValue?: React.ReactNode) {
113113
*/
114114
export function createStreamableValue<T = any, E = any>(initialValue?: T) {
115115
let closed = false;
116-
let { promise, resolve } = createResolvablePromise<StreamableValue<T, E>>();
116+
let resolvable = createResolvablePromise<StreamableValue<T, E>>();
117+
118+
let currentValue = initialValue;
119+
let currentError: E | undefined;
120+
let currentPromise: typeof resolvable.promise | undefined =
121+
resolvable.promise;
117122

118123
function assertStream(method: string) {
119124
if (closed) {
@@ -136,35 +141,37 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
136141
}
137142
warnUnclosedStream();
138143

139-
function createWrapped(
140-
val: T | undefined,
141-
initial?: boolean,
142-
): StreamableValue<T, E> {
143-
if (initial) {
144-
return {
145-
type: STREAMABLE_VALUE_TYPE,
146-
curr: val,
147-
next: promise,
148-
};
144+
function createWrapped(withType?: boolean): StreamableValue<T, E> {
145+
// This makes the payload much smaller if there're mutative updates before the first read.
146+
const init: Partial<StreamableValue<T, E>> =
147+
currentError === undefined
148+
? { curr: currentValue }
149+
: { error: currentError };
150+
151+
if (currentPromise) {
152+
init.next = currentPromise;
149153
}
150154

151-
return {
152-
curr: val,
153-
next: promise,
154-
};
155+
if (withType) {
156+
init.type = STREAMABLE_VALUE_TYPE;
157+
}
158+
159+
return init;
155160
}
156161

157162
return {
158-
value: createWrapped(initialValue, true),
163+
get value() {
164+
return createWrapped(true);
165+
},
159166
update(value: T) {
160167
assertStream('.update()');
161168

162-
const resolvePrevious = resolve;
163-
const resolvable = createResolvablePromise();
164-
promise = resolvable.promise;
165-
resolve = resolvable.resolve;
169+
const resolvePrevious = resolvable.resolve;
170+
resolvable = createResolvablePromise();
166171

167-
resolvePrevious(createWrapped(value));
172+
currentValue = value;
173+
currentPromise = resolvable.promise;
174+
resolvePrevious(createWrapped());
168175

169176
warnUnclosedStream();
170177
},
@@ -175,7 +182,10 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
175182
clearTimeout(warningTimeout);
176183
}
177184
closed = true;
178-
resolve({ error });
185+
currentError = error;
186+
currentPromise = undefined;
187+
188+
resolvable.resolve({ error });
179189
},
180190
done(...args: any) {
181191
assertStream('.done()');
@@ -184,13 +194,15 @@ export function createStreamableValue<T = any, E = any>(initialValue?: T) {
184194
clearTimeout(warningTimeout);
185195
}
186196
closed = true;
197+
currentPromise = undefined;
187198

188199
if (args.length) {
189-
resolve({ curr: args[0] });
200+
currentValue = args[0];
201+
resolvable.resolve({ curr: args[0] });
190202
return;
191203
}
192204

193-
resolve({});
205+
resolvable.resolve({});
194206
},
195207
};
196208
}

0 commit comments

Comments
 (0)
Please sign in to comment.