Skip to content

Commit 56ef84a

Browse files
authoredApr 26, 2024··
ai/core: fix uncaught abort handling exception (#1451)
1 parent 1bc9916 commit 56ef84a

File tree

8 files changed

+51
-51
lines changed

8 files changed

+51
-51
lines changed
 

‎.changeset/quiet-wasps-tell.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@ai-sdk/provider-utils': patch
3+
'ai': patch
4+
---
5+
6+
ai/core: fix abort handling in transformation stream

‎examples/ai-core/src/stream-text/openai-abort.ts

+5-14
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,24 @@ import dotenv from 'dotenv';
55
dotenv.config();
66

77
async function main() {
8-
const abortController = new AbortController();
9-
10-
// run async:
11-
(async () => {
12-
await delay(1500); // wait 1.5 seconds
13-
abortController.abort(); // aborts the streaming
14-
})();
15-
168
try {
179
const { textStream } = await experimental_streamText({
1810
model: openai('gpt-3.5-turbo'),
1911
prompt: 'Write a short story about a robot learning to love:\n\n',
20-
abortSignal: abortController.signal,
12+
abortSignal: AbortSignal.timeout(3000),
2113
});
2214

2315
for await (const textPart of textStream) {
2416
process.stdout.write(textPart);
2517
}
2618
} catch (error) {
27-
if (error instanceof Error && error.name === 'AbortError') {
19+
if (
20+
error instanceof Error &&
21+
(error.name === 'AbortError' || error.name === 'TimeoutError')
22+
) {
2823
console.log('\n\nAbortError: The run was aborted.');
2924
}
3025
}
3126
}
3227

3328
main().catch(console.error);
34-
35-
async function delay(delayInMs: number): Promise<void> {
36-
return new Promise(resolve => setTimeout(resolve, delayInMs));
37-
}

‎packages/core/core/generate-text/run-tools-transformation.ts

+24-21
Original file line numberDiff line numberDiff line change
@@ -162,27 +162,30 @@ export function runToolsTransformation<
162162
// combine the generator stream and the tool results stream
163163
return new ReadableStream<TextStreamPart<TOOLS>>({
164164
async start(controller) {
165-
generatorStream.pipeThrough(forwardStream).pipeTo(
166-
new WritableStream({
167-
write(chunk) {
168-
controller.enqueue(chunk);
169-
},
170-
close() {
171-
// the generator stream controller is automatically closed when it's consumed
172-
},
173-
}),
174-
);
175-
176-
toolResultsStream.pipeTo(
177-
new WritableStream({
178-
write(chunk) {
179-
controller.enqueue(chunk);
180-
},
181-
close() {
182-
controller.close();
183-
},
184-
}),
185-
);
165+
// need to wait for both pipes so there are no dangling promises that
166+
// can cause uncaught promise rejections when the stream is aborted
167+
return Promise.all([
168+
generatorStream.pipeThrough(forwardStream).pipeTo(
169+
new WritableStream({
170+
write(chunk) {
171+
controller.enqueue(chunk);
172+
},
173+
close() {
174+
// the generator stream controller is automatically closed when it's consumed
175+
},
176+
}),
177+
),
178+
toolResultsStream.pipeTo(
179+
new WritableStream({
180+
write(chunk) {
181+
controller.enqueue(chunk);
182+
},
183+
close() {
184+
controller.close();
185+
},
186+
}),
187+
),
188+
]);
186189
},
187190
});
188191
}

‎packages/core/core/generate-text/stream-text.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import {
44
LanguageModelV1FinishReason,
55
LanguageModelV1LogProbs,
66
} from '@ai-sdk/provider';
7+
import { ServerResponse } from 'node:http';
78
import {
89
AIStreamCallbacksAndOptions,
9-
StreamData,
1010
StreamingTextResponse,
1111
createCallbacksTransformer,
1212
createStreamDataTransformer,
@@ -26,7 +26,6 @@ import { retryWithExponentialBackoff } from '../util/retry-with-exponential-back
2626
import { runToolsTransformation } from './run-tools-transformation';
2727
import { ToToolCall } from './tool-call';
2828
import { ToToolResult } from './tool-result';
29-
import { ServerResponse } from 'node:http';
3029

3130
/**
3231
Generate a text and call tools for a given prompt using a language model.

‎packages/core/core/util/retry-with-exponential-backoff.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { APICallError, RetryError } from '@ai-sdk/provider';
2-
import { getErrorMessage } from '@ai-sdk/provider-utils';
2+
import { getErrorMessage, isAbortError } from '@ai-sdk/provider-utils';
33
import { delay } from './delay';
44

55
export type RetryFunction = <OUTPUT>(
@@ -35,7 +35,7 @@ async function _retryWithExponentialBackoff<OUTPUT>(
3535
try {
3636
return await f();
3737
} catch (error) {
38-
if (error instanceof Error && error.name === 'AbortError') {
38+
if (isAbortError(error)) {
3939
throw error; // don't retry when the request was aborted
4040
}
4141

‎packages/provider-utils/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export * from './extract-response-headers';
22
export * from './generate-id';
33
export * from './get-error-message';
4+
export * from './is-abort-error';
45
export * from './load-api-key';
56
export * from './parse-json';
67
export * from './post-to-api';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export function isAbortError(error: unknown): error is DOMException {
2+
return (
3+
error instanceof DOMException &&
4+
(error.name === 'AbortError' || error.name === 'TimeoutError')
5+
);
6+
}

‎packages/provider-utils/src/post-to-api.ts

+6-12
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { APICallError } from '@ai-sdk/provider';
2+
import { isAbortError } from './is-abort-error';
23
import { ResponseHandler } from './response-handler';
34

45
export const postJsonToApi = async <T>({
@@ -70,13 +71,8 @@ export const postToApi = async <T>({
7071
requestBodyValues: body.values,
7172
});
7273
} catch (error) {
73-
if (error instanceof Error) {
74-
if (
75-
error.name === 'AbortError' ||
76-
APICallError.isAPICallError(error)
77-
) {
78-
throw error;
79-
}
74+
if (isAbortError(error) || APICallError.isAPICallError(error)) {
75+
throw error;
8076
}
8177

8278
throw new APICallError({
@@ -97,7 +93,7 @@ export const postToApi = async <T>({
9793
});
9894
} catch (error) {
9995
if (error instanceof Error) {
100-
if (error.name === 'AbortError' || APICallError.isAPICallError(error)) {
96+
if (isAbortError(error) || APICallError.isAPICallError(error)) {
10197
throw error;
10298
}
10399
}
@@ -111,10 +107,8 @@ export const postToApi = async <T>({
111107
});
112108
}
113109
} catch (error) {
114-
if (error instanceof Error) {
115-
if (error.name === 'AbortError') {
116-
throw error;
117-
}
110+
if (isAbortError(error)) {
111+
throw error;
118112
}
119113

120114
// unwrap original error when fetch failed (for easier debugging):

0 commit comments

Comments
 (0)
Please sign in to comment.