Skip to content

Commit f90f6a1

Browse files
authoredApr 23, 2024··
ai/core: add pipeAIStreamToResponse to streamText result. (#1404)
1 parent dc407d8 commit f90f6a1

File tree

5 files changed

+131
-11
lines changed

5 files changed

+131
-11
lines changed
 

‎.changeset/eight-roses-clean.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
ai/core: add pipeAIStreamToResponse() to streamText result.

‎examples/next-openai-pages/pages/api/chat-node.ts

+4-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { createOpenAI } from '@ai-sdk/openai';
2-
import { experimental_streamText, streamToResponse } from 'ai';
2+
import { experimental_streamText } from 'ai';
33
import { NextApiRequest, NextApiResponse } from 'next';
44

55
// Create an OpenAI Provider instance
@@ -19,12 +19,7 @@ export default async function handler(
1919
messages,
2020
});
2121

22-
// Convert the response into a friendly text-stream
23-
const stream = result.toAIStream();
24-
25-
/**
26-
* Converts the stream to a Node.js Response-like object.
27-
* Please note that this sends the response as one message once it's done.
28-
*/
29-
return streamToResponse(stream, res);
22+
// write the AI stream to the response
23+
// Note: this is sent as a single response, not a stream
24+
result.pipeAIStreamToResponse(res);
3025
}

‎packages/core/core/generate-text/stream-text.test.ts

+49
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { convertAsyncIterableToArray } from '../test/convert-async-iterable-to-a
55
import { convertReadableStreamToArray } from '../test/convert-readable-stream-to-array';
66
import { MockLanguageModelV1 } from '../test/mock-language-model-v1';
77
import { experimental_streamText } from './stream-text';
8+
import { ServerResponse } from 'node:http';
9+
import { createMockServerResponse } from '../test/mock-server-response';
810

911
describe('result.textStream', () => {
1012
it('should send text deltas', async () => {
@@ -260,6 +262,53 @@ describe('result.toAIStream', () => {
260262
});
261263
});
262264

265+
describe('result.pipeAIStreamToResponse', async () => {
266+
it('should write text deltas to a Node.js response-like object', async () => {
267+
const mockResponse = createMockServerResponse();
268+
269+
const result = await experimental_streamText({
270+
model: new MockLanguageModelV1({
271+
doStream: async () => {
272+
return {
273+
stream: convertArrayToReadableStream([
274+
{ type: 'text-delta', textDelta: 'Hello' },
275+
{ type: 'text-delta', textDelta: ', ' },
276+
{ type: 'text-delta', textDelta: 'world!' },
277+
]),
278+
rawCall: { rawPrompt: 'prompt', rawSettings: {} },
279+
};
280+
},
281+
}),
282+
prompt: 'test-input',
283+
});
284+
285+
result.pipeAIStreamToResponse(mockResponse);
286+
287+
// Wait for the stream to finish writing to the mock response
288+
await new Promise(resolve => {
289+
const checkIfEnded = () => {
290+
if (mockResponse.ended) {
291+
resolve(undefined);
292+
} else {
293+
setImmediate(checkIfEnded);
294+
}
295+
};
296+
checkIfEnded();
297+
});
298+
299+
const decoder = new TextDecoder();
300+
301+
assert.strictEqual(mockResponse.statusCode, 200);
302+
assert.deepStrictEqual(mockResponse.headers, {
303+
'Content-Type': 'text/plain; charset=utf-8',
304+
});
305+
assert.deepStrictEqual(
306+
mockResponse.writtenChunks.map(chunk => decoder.decode(chunk)),
307+
['0:"Hello"\n', '0:", "\n', '0:"world!"\n'],
308+
);
309+
});
310+
});
311+
263312
describe('result.toTextStreamResponse', () => {
264313
it('should create a Response with a text stream', async () => {
265314
const result = await experimental_streamText({

‎packages/core/core/generate-text/stream-text.ts

+41-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { retryWithExponentialBackoff } from '../util/retry-with-exponential-back
2323
import { runToolsTransformation } from './run-tools-transformation';
2424
import { ToToolCall } from './tool-call';
2525
import { ToToolResult } from './tool-result';
26+
import { ServerResponse } from 'node:http';
2627

2728
/**
2829
Generate a text and call tools for a given prompt using a language model.
@@ -218,6 +219,45 @@ Stream callbacks that will be called when the stream emits events.
218219
.pipeThrough(createStreamDataTransformer());
219220
}
220221

222+
/**
223+
Writes stream data output to a Node.js response-like object.
224+
It sets a `Content-Type` header to `text/plain; charset=utf-8` and
225+
writes each text delta as a separate chunk.
226+
227+
@param response A Node.js response-like object (ServerResponse).
228+
@param init Optional headers and status code.
229+
*/
230+
pipeAIStreamToResponse(
231+
response: ServerResponse,
232+
init?: { headers?: Record<string, string>; status?: number },
233+
) {
234+
response.writeHead(init?.status ?? 200, {
235+
'Content-Type': 'text/plain; charset=utf-8',
236+
...init?.headers,
237+
});
238+
239+
const reader = this.textStream
240+
.pipeThrough(createCallbacksTransformer(undefined))
241+
.pipeThrough(createStreamDataTransformer())
242+
.getReader();
243+
244+
const read = async () => {
245+
try {
246+
while (true) {
247+
const { done, value } = await reader.read();
248+
if (done) break;
249+
response.write(value);
250+
}
251+
} catch (error) {
252+
throw error;
253+
} finally {
254+
response.end();
255+
}
256+
};
257+
258+
read();
259+
}
260+
221261
/**
222262
Creates a simple text stream response.
223263
Each text delta is encoded as UTF-8 and sent as a separate chunk.
@@ -234,8 +274,7 @@ Non-text-delta events are ignored.
234274
}),
235275
),
236276
{
237-
...init,
238-
status: 200,
277+
status: init?.status ?? 200,
239278
headers: {
240279
'Content-Type': 'text/plain; charset=utf-8',
241280
...init?.headers,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { ServerResponse } from 'node:http';
2+
3+
class MockServerResponse {
4+
writtenChunks: any[] = [];
5+
headers = {};
6+
statusCode = 0;
7+
ended = false;
8+
9+
write(chunk: any): void {
10+
this.writtenChunks.push(chunk);
11+
}
12+
13+
end(): void {
14+
// You might want to mark the response as ended to simulate the real behavior
15+
this.ended = true;
16+
}
17+
18+
writeHead(statusCode: number, headers: Record<string, string>): void {
19+
this.statusCode = statusCode;
20+
this.headers = headers;
21+
}
22+
23+
get body() {
24+
// Combine all written chunks into a single string
25+
return this.writtenChunks.join('');
26+
}
27+
}
28+
29+
export function createMockServerResponse(): ServerResponse &
30+
MockServerResponse {
31+
return new MockServerResponse() as ServerResponse & MockServerResponse;
32+
}

0 commit comments

Comments
 (0)
Please sign in to comment.