Skip to content

Commit

Permalink
ai/rsc: ReadableStream as provider for streamable value; add `.append…
Browse files Browse the repository at this point in the history
…()` method (#1460)
  • Loading branch information
shuding committed Apr 28, 2024
1 parent 400b2b1 commit b4c68ec
Show file tree
Hide file tree
Showing 3 changed files with 416 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/clean-planes-reflect.md
@@ -0,0 +1,5 @@
---
'ai': patch
---

ai/rsc: ReadableStream as provider for createStreamableValue; add .append() method
277 changes: 277 additions & 0 deletions packages/core/rsc/shared-client/streamable.ui.test.tsx
Expand Up @@ -228,5 +228,282 @@ describe('rsc - readStreamableValue()', () => {
]
`);
});

it('should be able to call .append() to send patches', async () => {
const streamable = createStreamableValue();
const value = streamable.value;

streamable.append('hello');
streamable.append(' world');
streamable.append('!');
streamable.done();

expect(await getRawChunks(value)).toMatchInlineSnapshot(`
[
{
"curr": undefined,
"type": Symbol(ui.streamable.value),
},
{
"curr": "hello",
},
{
"diff": [
0,
" world",
],
},
{
"diff": [
0,
"!",
],
},
{},
]
`);

const values = [];
for await (const v of readStreamableValue(value)) {
values.push(v);
}
expect(values).toMatchInlineSnapshot(`
[
"hello",
"hello world",
"hello world!",
]
`);
});

it('should be able to mix .update() and .append() with optimized payloads', async () => {
const streamable = createStreamableValue('hello');
const value = streamable.value;

streamable.append(' world');
streamable.update('hello world!!');
streamable.update('some new');
streamable.update('some new string');
streamable.append(' with patch!');
streamable.done();

expect(await getRawChunks(value)).toMatchInlineSnapshot(`
[
{
"curr": "hello",
"type": Symbol(ui.streamable.value),
},
{
"diff": [
0,
" world",
],
},
{
"diff": [
0,
"!!",
],
},
{
"curr": "some new",
},
{
"diff": [
0,
" string",
],
},
{
"diff": [
0,
" with patch!",
],
},
{},
]
`);

const values = [];
for await (const v of readStreamableValue(value)) {
values.push(v);
}
expect(values).toMatchInlineSnapshot(`
[
"hello",
"hello world",
"hello world!!",
"some new",
"some new string",
"some new string with patch!",
]
`);
});

it('should behave like .update() with .append() and .done()', async () => {
const streamable = createStreamableValue('hello');
const value = streamable.value;

streamable.append(' world');
streamable.done('fin');

expect(await getRawChunks(value)).toMatchInlineSnapshot(`
[
{
"curr": "hello",
"type": Symbol(ui.streamable.value),
},
{
"diff": [
0,
" world",
],
},
{
"curr": "fin",
},
]
`);

const values = [];
for await (const v of readStreamableValue(value)) {
values.push(v);
}
expect(values).toMatchInlineSnapshot(`
[
"hello",
"hello world",
"fin",
]
`);
});
});

describe('readableStream', () => {
it('should be able to accept readableStream as the source', async () => {
const streamable = createStreamableValue(
new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue(' world');
controller.enqueue('!');
controller.close();
},
}),
);
const value = streamable.value;

expect(await getRawChunks(value)).toMatchInlineSnapshot(`
[
{
"curr": undefined,
"type": Symbol(ui.streamable.value),
},
{
"curr": "hello",
},
{
"diff": [
0,
" world",
],
},
{
"diff": [
0,
"!",
],
},
{},
]
`);

const values = [];
for await (const v of readStreamableValue(value)) {
values.push(v);
}
expect(values).toMatchInlineSnapshot(`
[
"hello",
"hello world",
"hello world!",
]
`);
});

it('should accept readableStream with JSON payloads', async () => {
const streamable = createStreamableValue(
new ReadableStream({
start(controller) {
controller.enqueue({ v: 1 });
controller.enqueue({ v: 2 });
controller.enqueue({ v: 3 });
controller.close();
},
}),
);
const value = streamable.value;

expect(await getRawChunks(value)).toMatchInlineSnapshot(`
[
{
"curr": undefined,
"type": Symbol(ui.streamable.value),
},
{
"curr": {
"v": 1,
},
},
{
"curr": {
"v": 2,
},
},
{
"curr": {
"v": 3,
},
},
{},
]
`);

const values = [];
for await (const v of readStreamableValue(value)) {
values.push(v);
}
expect(values).toMatchInlineSnapshot(`
[
{
"v": 1,
},
{
"v": 2,
},
{
"v": 3,
},
]
`);
});

it('should lock the streamable if from readableStream', async () => {
const streamable = createStreamableValue(
new ReadableStream({
async start(controller) {
await nextTick();
controller.enqueue('hello');
controller.close();
},
}),
);

expect(() =>
streamable.update('world'),
).toThrowErrorMatchingInlineSnapshot(
'".update(): Value stream is locked and cannot be updated."',
);
});
});
});

0 comments on commit b4c68ec

Please sign in to comment.