Skip to content

Commit

Permalink
ai/core: add pipeTextStreamToResponse helper to streamText. (#1442)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgrammel committed Apr 25, 2024
1 parent 5189802 commit 0e0d2af
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/small-islands-melt.md
@@ -0,0 +1,5 @@
---
'ai': patch
---

ai/core: add pipeTextStreamToResponse helper to streamText.
88 changes: 87 additions & 1 deletion packages/core/core/generate-text/stream-text.test.ts
Expand Up @@ -269,7 +269,7 @@ describe('result.toAIStream', () => {
});

describe('result.pipeAIStreamToResponse', async () => {
it('should write text deltas to a Node.js response-like object', async () => {
it('should write data stream parts to a Node.js response-like object', async () => {
const mockResponse = createMockServerResponse();

const result = await experimental_streamText({
Expand Down Expand Up @@ -315,6 +315,92 @@ describe('result.pipeAIStreamToResponse', async () => {
});
});

describe('result.pipeTextStreamToResponse', async () => {
it('should write text deltas to a Node.js response-like object', async () => {
const mockResponse = createMockServerResponse();

const result = await experimental_streamText({
model: new MockLanguageModelV1({
doStream: async () => {
return {
stream: convertArrayToReadableStream([
{ type: 'text-delta', textDelta: 'Hello' },
{ type: 'text-delta', textDelta: ', ' },
{ type: 'text-delta', textDelta: 'world!' },
]),
rawCall: { rawPrompt: 'prompt', rawSettings: {} },
};
},
}),
prompt: 'test-input',
});

result.pipeTextStreamToResponse(mockResponse);

// Wait for the stream to finish writing to the mock response
await new Promise(resolve => {
const checkIfEnded = () => {
if (mockResponse.ended) {
resolve(undefined);
} else {
setImmediate(checkIfEnded);
}
};
checkIfEnded();
});

const decoder = new TextDecoder();

assert.strictEqual(mockResponse.statusCode, 200);
assert.deepStrictEqual(mockResponse.headers, {
'Content-Type': 'text/plain; charset=utf-8',
});
assert.deepStrictEqual(
mockResponse.writtenChunks.map(chunk => decoder.decode(chunk)),
['Hello', ', ', 'world!'],
);
});
});

describe('result.toAIStreamResponse', () => {
it('should create a Response with a stream data stream', async () => {
const result = await experimental_streamText({
model: new MockLanguageModelV1({
doStream: async ({ prompt, mode }) => {
return {
stream: convertArrayToReadableStream([
{ type: 'text-delta', textDelta: 'Hello' },
{ type: 'text-delta', textDelta: ', ' },
{ type: 'text-delta', textDelta: 'world!' },
]),
rawCall: { rawPrompt: 'prompt', rawSettings: {} },
};
},
}),
prompt: 'test-input',
});

const response = result.toAIStreamResponse();

assert.strictEqual(response.status, 200);
assert.strictEqual(
response.headers.get('Content-Type'),
'text/plain; charset=utf-8',
);

// Read the chunks into an array
const reader = response.body!.getReader();
const chunks = [];
while (true) {
const { value, done } = await reader.read();
if (done) break;
chunks.push(new TextDecoder().decode(value));
}

assert.deepStrictEqual(chunks, ['0:"Hello"\n', '0:", "\n', '0:"world!"\n']);
});
});

describe('result.toTextStreamResponse', () => {
it('should create a Response with a text stream', async () => {
const result = await experimental_streamText({
Expand Down
39 changes: 38 additions & 1 deletion packages/core/core/generate-text/stream-text.ts
Expand Up @@ -240,7 +240,7 @@ Stream callbacks that will be called when the stream emits events.
/**
Writes stream data output to a Node.js response-like object.
It sets a `Content-Type` header to `text/plain; charset=utf-8` and
writes each text delta as a separate chunk.
writes each stream data part as a separate chunk.
@param response A Node.js response-like object (ServerResponse).
@param init Optional headers and status code.
Expand Down Expand Up @@ -276,6 +276,43 @@ writes each text delta as a separate chunk.
read();
}

/**
Writes text delta output to a Node.js response-like object.
It sets a `Content-Type` header to `text/plain; charset=utf-8` and
writes each text delta as a separate chunk.
@param response A Node.js response-like object (ServerResponse).
@param init Optional headers and status code.
*/
pipeTextStreamToResponse(
response: ServerResponse,
init?: { headers?: Record<string, string>; status?: number },
) {
response.writeHead(init?.status ?? 200, {
'Content-Type': 'text/plain; charset=utf-8',
...init?.headers,
});

const reader = this.textStream.getReader();

const read = async () => {
const encoder = new TextEncoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
response.write(encoder.encode(value));
}
} catch (error) {
throw error;
} finally {
response.end();
}
};

read();
}

/**
Converts the result to a streamed response object with a stream data part stream.
It can be used with the `useChat` and `useCompletion` hooks.
Expand Down

0 comments on commit 0e0d2af

Please sign in to comment.