Skip to content

Commit

Permalink
ai/core: add pipeAIStreamToResponse to streamText result. (#1404)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgrammel committed Apr 23, 2024
1 parent dc407d8 commit f90f6a1
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/eight-roses-clean.md
@@ -0,0 +1,5 @@
---
'ai': patch
---

ai/core: add pipeAIStreamToResponse() to streamText result.
13 changes: 4 additions & 9 deletions examples/next-openai-pages/pages/api/chat-node.ts
@@ -1,5 +1,5 @@
import { createOpenAI } from '@ai-sdk/openai';
import { experimental_streamText, streamToResponse } from 'ai';
import { experimental_streamText } from 'ai';
import { NextApiRequest, NextApiResponse } from 'next';

// Create an OpenAI Provider instance
Expand All @@ -19,12 +19,7 @@ export default async function handler(
messages,
});

// Convert the response into a friendly text-stream
const stream = result.toAIStream();

/**
* Converts the stream to a Node.js Response-like object.
* Please note that this sends the response as one message once it's done.
*/
return streamToResponse(stream, res);
// write the AI stream to the response
// Note: this is sent as a single response, not a stream
result.pipeAIStreamToResponse(res);
}
49 changes: 49 additions & 0 deletions packages/core/core/generate-text/stream-text.test.ts
Expand Up @@ -5,6 +5,8 @@ import { convertAsyncIterableToArray } from '../test/convert-async-iterable-to-a
import { convertReadableStreamToArray } from '../test/convert-readable-stream-to-array';
import { MockLanguageModelV1 } from '../test/mock-language-model-v1';
import { experimental_streamText } from './stream-text';
import { ServerResponse } from 'node:http';
import { createMockServerResponse } from '../test/mock-server-response';

describe('result.textStream', () => {
it('should send text deltas', async () => {
Expand Down Expand Up @@ -260,6 +262,53 @@ describe('result.toAIStream', () => {
});
});

describe('result.pipeAIStreamToResponse', async () => {
it('should write text deltas to a Node.js response-like object', async () => {
const mockResponse = createMockServerResponse();

const result = await experimental_streamText({
model: new MockLanguageModelV1({
doStream: async () => {
return {
stream: convertArrayToReadableStream([
{ type: 'text-delta', textDelta: 'Hello' },
{ type: 'text-delta', textDelta: ', ' },
{ type: 'text-delta', textDelta: 'world!' },
]),
rawCall: { rawPrompt: 'prompt', rawSettings: {} },
};
},
}),
prompt: 'test-input',
});

result.pipeAIStreamToResponse(mockResponse);

// Wait for the stream to finish writing to the mock response
await new Promise(resolve => {
const checkIfEnded = () => {
if (mockResponse.ended) {
resolve(undefined);
} else {
setImmediate(checkIfEnded);
}
};
checkIfEnded();
});

const decoder = new TextDecoder();

assert.strictEqual(mockResponse.statusCode, 200);
assert.deepStrictEqual(mockResponse.headers, {
'Content-Type': 'text/plain; charset=utf-8',
});
assert.deepStrictEqual(
mockResponse.writtenChunks.map(chunk => decoder.decode(chunk)),
['0:"Hello"\n', '0:", "\n', '0:"world!"\n'],
);
});
});

describe('result.toTextStreamResponse', () => {
it('should create a Response with a text stream', async () => {
const result = await experimental_streamText({
Expand Down
43 changes: 41 additions & 2 deletions packages/core/core/generate-text/stream-text.ts
Expand Up @@ -23,6 +23,7 @@ import { retryWithExponentialBackoff } from '../util/retry-with-exponential-back
import { runToolsTransformation } from './run-tools-transformation';
import { ToToolCall } from './tool-call';
import { ToToolResult } from './tool-result';
import { ServerResponse } from 'node:http';

/**
Generate a text and call tools for a given prompt using a language model.
Expand Down Expand Up @@ -218,6 +219,45 @@ Stream callbacks that will be called when the stream emits events.
.pipeThrough(createStreamDataTransformer());
}

/**
Writes stream data output to a Node.js response-like object.
It sets a `Content-Type` header to `text/plain; charset=utf-8` and
writes each text delta as a separate chunk.
@param response A Node.js response-like object (ServerResponse).
@param init Optional headers and status code.
*/
pipeAIStreamToResponse(
response: ServerResponse,
init?: { headers?: Record<string, string>; status?: number },
) {
response.writeHead(init?.status ?? 200, {
'Content-Type': 'text/plain; charset=utf-8',
...init?.headers,
});

const reader = this.textStream
.pipeThrough(createCallbacksTransformer(undefined))
.pipeThrough(createStreamDataTransformer())
.getReader();

const read = async () => {
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
response.write(value);
}
} catch (error) {
throw error;
} finally {
response.end();
}
};

read();
}

/**
Creates a simple text stream response.
Each text delta is encoded as UTF-8 and sent as a separate chunk.
Expand All @@ -234,8 +274,7 @@ Non-text-delta events are ignored.
}),
),
{
...init,
status: 200,
status: init?.status ?? 200,
headers: {
'Content-Type': 'text/plain; charset=utf-8',
...init?.headers,
Expand Down
32 changes: 32 additions & 0 deletions packages/core/core/test/mock-server-response.ts
@@ -0,0 +1,32 @@
import { ServerResponse } from 'node:http';

class MockServerResponse {
writtenChunks: any[] = [];
headers = {};
statusCode = 0;
ended = false;

write(chunk: any): void {
this.writtenChunks.push(chunk);
}

end(): void {
// You might want to mark the response as ended to simulate the real behavior
this.ended = true;
}

writeHead(statusCode: number, headers: Record<string, string>): void {
this.statusCode = statusCode;
this.headers = headers;
}

get body() {
// Combine all written chunks into a single string
return this.writtenChunks.join('');
}
}

export function createMockServerResponse(): ServerResponse &
MockServerResponse {
return new MockServerResponse() as ServerResponse & MockServerResponse;
}

0 comments on commit f90f6a1

Please sign in to comment.