Skip to content

Commit

Permalink
ai/core: fix uncaught abort handling exception (#1451)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgrammel committed Apr 26, 2024
1 parent 1bc9916 commit 56ef84a
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 51 deletions.
6 changes: 6 additions & 0 deletions .changeset/quiet-wasps-tell.md
@@ -0,0 +1,6 @@
---
'@ai-sdk/provider-utils': patch
'ai': patch
---

ai/core: fix abort handling in transformation stream
19 changes: 5 additions & 14 deletions examples/ai-core/src/stream-text/openai-abort.ts
Expand Up @@ -5,33 +5,24 @@ import dotenv from 'dotenv';
dotenv.config();

async function main() {
const abortController = new AbortController();

// run async:
(async () => {
await delay(1500); // wait 1.5 seconds
abortController.abort(); // aborts the streaming
})();

try {
const { textStream } = await experimental_streamText({
model: openai('gpt-3.5-turbo'),
prompt: 'Write a short story about a robot learning to love:\n\n',
abortSignal: abortController.signal,
abortSignal: AbortSignal.timeout(3000),
});

for await (const textPart of textStream) {
process.stdout.write(textPart);
}
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
if (
error instanceof Error &&
(error.name === 'AbortError' || error.name === 'TimeoutError')
) {
console.log('\n\nAbortError: The run was aborted.');
}
}
}

main().catch(console.error);

async function delay(delayInMs: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, delayInMs));
}
45 changes: 24 additions & 21 deletions packages/core/core/generate-text/run-tools-transformation.ts
Expand Up @@ -162,27 +162,30 @@ export function runToolsTransformation<
// combine the generator stream and the tool results stream
return new ReadableStream<TextStreamPart<TOOLS>>({
async start(controller) {
generatorStream.pipeThrough(forwardStream).pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
close() {
// the generator stream controller is automatically closed when it's consumed
},
}),
);

toolResultsStream.pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
close() {
controller.close();
},
}),
);
// need to wait for both pipes so there are no dangling promises that
// can cause uncaught promise rejections when the stream is aborted
return Promise.all([
generatorStream.pipeThrough(forwardStream).pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
close() {
// the generator stream controller is automatically closed when it's consumed
},
}),
),
toolResultsStream.pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
close() {
controller.close();
},
}),
),
]);
},
});
}
3 changes: 1 addition & 2 deletions packages/core/core/generate-text/stream-text.ts
Expand Up @@ -4,9 +4,9 @@ import {
LanguageModelV1FinishReason,
LanguageModelV1LogProbs,
} from '@ai-sdk/provider';
import { ServerResponse } from 'node:http';
import {
AIStreamCallbacksAndOptions,
StreamData,
StreamingTextResponse,
createCallbacksTransformer,
createStreamDataTransformer,
Expand All @@ -26,7 +26,6 @@ import { retryWithExponentialBackoff } from '../util/retry-with-exponential-back
import { runToolsTransformation } from './run-tools-transformation';
import { ToToolCall } from './tool-call';
import { ToToolResult } from './tool-result';
import { ServerResponse } from 'node:http';

/**
Generate a text and call tools for a given prompt using a language model.
Expand Down
4 changes: 2 additions & 2 deletions packages/core/core/util/retry-with-exponential-backoff.ts
@@ -1,5 +1,5 @@
import { APICallError, RetryError } from '@ai-sdk/provider';
import { getErrorMessage } from '@ai-sdk/provider-utils';
import { getErrorMessage, isAbortError } from '@ai-sdk/provider-utils';
import { delay } from './delay';

export type RetryFunction = <OUTPUT>(
Expand Down Expand Up @@ -35,7 +35,7 @@ async function _retryWithExponentialBackoff<OUTPUT>(
try {
return await f();
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
if (isAbortError(error)) {
throw error; // don't retry when the request was aborted
}

Expand Down
1 change: 1 addition & 0 deletions packages/provider-utils/src/index.ts
@@ -1,6 +1,7 @@
export * from './extract-response-headers';
export * from './generate-id';
export * from './get-error-message';
export * from './is-abort-error';
export * from './load-api-key';
export * from './parse-json';
export * from './post-to-api';
Expand Down
6 changes: 6 additions & 0 deletions packages/provider-utils/src/is-abort-error.ts
@@ -0,0 +1,6 @@
export function isAbortError(error: unknown): error is DOMException {
return (
error instanceof DOMException &&
(error.name === 'AbortError' || error.name === 'TimeoutError')
);
}
18 changes: 6 additions & 12 deletions packages/provider-utils/src/post-to-api.ts
@@ -1,4 +1,5 @@
import { APICallError } from '@ai-sdk/provider';
import { isAbortError } from './is-abort-error';
import { ResponseHandler } from './response-handler';

export const postJsonToApi = async <T>({
Expand Down Expand Up @@ -70,13 +71,8 @@ export const postToApi = async <T>({
requestBodyValues: body.values,
});
} catch (error) {
if (error instanceof Error) {
if (
error.name === 'AbortError' ||
APICallError.isAPICallError(error)
) {
throw error;
}
if (isAbortError(error) || APICallError.isAPICallError(error)) {
throw error;
}

throw new APICallError({
Expand All @@ -97,7 +93,7 @@ export const postToApi = async <T>({
});
} catch (error) {
if (error instanceof Error) {
if (error.name === 'AbortError' || APICallError.isAPICallError(error)) {
if (isAbortError(error) || APICallError.isAPICallError(error)) {
throw error;
}
}
Expand All @@ -111,10 +107,8 @@ export const postToApi = async <T>({
});
}
} catch (error) {
if (error instanceof Error) {
if (error.name === 'AbortError') {
throw error;
}
if (isAbortError(error)) {
throw error;
}

// unwrap original error when fetch failed (for easier debugging):
Expand Down

0 comments on commit 56ef84a

Please sign in to comment.