File tree 8 files changed +51
-51
lines changed
examples/ai-core/src/stream-text
8 files changed +51
-51
lines changed Original file line number Diff line number Diff line change
1
+ ---
2
+ ' @ai-sdk/provider-utils ' : patch
3
+ ' ai ' : patch
4
+ ---
5
+
6
+ ai/core: fix abort handling in transformation stream
Original file line number Diff line number Diff line change @@ -5,33 +5,24 @@ import dotenv from 'dotenv';
5
5
dotenv . config ( ) ;
6
6
7
7
async function main ( ) {
8
- const abortController = new AbortController ( ) ;
9
-
10
- // run async:
11
- ( async ( ) => {
12
- await delay ( 1500 ) ; // wait 1.5 seconds
13
- abortController . abort ( ) ; // aborts the streaming
14
- } ) ( ) ;
15
-
16
8
try {
17
9
const { textStream } = await experimental_streamText ( {
18
10
model : openai ( 'gpt-3.5-turbo' ) ,
19
11
prompt : 'Write a short story about a robot learning to love:\n\n' ,
20
- abortSignal : abortController . signal ,
12
+ abortSignal : AbortSignal . timeout ( 3000 ) ,
21
13
} ) ;
22
14
23
15
for await ( const textPart of textStream ) {
24
16
process . stdout . write ( textPart ) ;
25
17
}
26
18
} catch ( error ) {
27
- if ( error instanceof Error && error . name === 'AbortError' ) {
19
+ if (
20
+ error instanceof Error &&
21
+ ( error . name === 'AbortError' || error . name === 'TimeoutError' )
22
+ ) {
28
23
console . log ( '\n\nAbortError: The run was aborted.' ) ;
29
24
}
30
25
}
31
26
}
32
27
33
28
main ( ) . catch ( console . error ) ;
34
-
35
- async function delay ( delayInMs : number ) : Promise < void > {
36
- return new Promise ( resolve => setTimeout ( resolve , delayInMs ) ) ;
37
- }
Original file line number Diff line number Diff line change @@ -162,27 +162,30 @@ export function runToolsTransformation<
162
162
// combine the generator stream and the tool results stream
163
163
return new ReadableStream < TextStreamPart < TOOLS > > ( {
164
164
async start ( controller ) {
165
- generatorStream . pipeThrough ( forwardStream ) . pipeTo (
166
- new WritableStream ( {
167
- write ( chunk ) {
168
- controller . enqueue ( chunk ) ;
169
- } ,
170
- close ( ) {
171
- // the generator stream controller is automatically closed when it's consumed
172
- } ,
173
- } ) ,
174
- ) ;
175
-
176
- toolResultsStream . pipeTo (
177
- new WritableStream ( {
178
- write ( chunk ) {
179
- controller . enqueue ( chunk ) ;
180
- } ,
181
- close ( ) {
182
- controller . close ( ) ;
183
- } ,
184
- } ) ,
185
- ) ;
165
+ // need to wait for both pipes so there are no dangling promises that
166
+ // can cause uncaught promise rejections when the stream is aborted
167
+ return Promise . all ( [
168
+ generatorStream . pipeThrough ( forwardStream ) . pipeTo (
169
+ new WritableStream ( {
170
+ write ( chunk ) {
171
+ controller . enqueue ( chunk ) ;
172
+ } ,
173
+ close ( ) {
174
+ // the generator stream controller is automatically closed when it's consumed
175
+ } ,
176
+ } ) ,
177
+ ) ,
178
+ toolResultsStream . pipeTo (
179
+ new WritableStream ( {
180
+ write ( chunk ) {
181
+ controller . enqueue ( chunk ) ;
182
+ } ,
183
+ close ( ) {
184
+ controller . close ( ) ;
185
+ } ,
186
+ } ) ,
187
+ ) ,
188
+ ] ) ;
186
189
} ,
187
190
} ) ;
188
191
}
Original file line number Diff line number Diff line change 4
4
LanguageModelV1FinishReason ,
5
5
LanguageModelV1LogProbs ,
6
6
} from '@ai-sdk/provider' ;
7
+ import { ServerResponse } from 'node:http' ;
7
8
import {
8
9
AIStreamCallbacksAndOptions ,
9
- StreamData ,
10
10
StreamingTextResponse ,
11
11
createCallbacksTransformer ,
12
12
createStreamDataTransformer ,
@@ -26,7 +26,6 @@ import { retryWithExponentialBackoff } from '../util/retry-with-exponential-back
26
26
import { runToolsTransformation } from './run-tools-transformation' ;
27
27
import { ToToolCall } from './tool-call' ;
28
28
import { ToToolResult } from './tool-result' ;
29
- import { ServerResponse } from 'node:http' ;
30
29
31
30
/**
32
31
Generate a text and call tools for a given prompt using a language model.
Original file line number Diff line number Diff line change 1
1
import { APICallError , RetryError } from '@ai-sdk/provider' ;
2
- import { getErrorMessage } from '@ai-sdk/provider-utils' ;
2
+ import { getErrorMessage , isAbortError } from '@ai-sdk/provider-utils' ;
3
3
import { delay } from './delay' ;
4
4
5
5
export type RetryFunction = < OUTPUT > (
@@ -35,7 +35,7 @@ async function _retryWithExponentialBackoff<OUTPUT>(
35
35
try {
36
36
return await f ( ) ;
37
37
} catch ( error ) {
38
- if ( error instanceof Error && error . name === 'AbortError' ) {
38
+ if ( isAbortError ( error ) ) {
39
39
throw error ; // don't retry when the request was aborted
40
40
}
41
41
Original file line number Diff line number Diff line change 1
1
export * from './extract-response-headers' ;
2
2
export * from './generate-id' ;
3
3
export * from './get-error-message' ;
4
+ export * from './is-abort-error' ;
4
5
export * from './load-api-key' ;
5
6
export * from './parse-json' ;
6
7
export * from './post-to-api' ;
Original file line number Diff line number Diff line change
1
+ export function isAbortError ( error : unknown ) : error is DOMException {
2
+ return (
3
+ error instanceof DOMException &&
4
+ ( error . name === 'AbortError' || error . name === 'TimeoutError' )
5
+ ) ;
6
+ }
Original file line number Diff line number Diff line change 1
1
import { APICallError } from '@ai-sdk/provider' ;
2
+ import { isAbortError } from './is-abort-error' ;
2
3
import { ResponseHandler } from './response-handler' ;
3
4
4
5
export const postJsonToApi = async < T > ( {
@@ -70,13 +71,8 @@ export const postToApi = async <T>({
70
71
requestBodyValues : body . values ,
71
72
} ) ;
72
73
} catch ( error ) {
73
- if ( error instanceof Error ) {
74
- if (
75
- error . name === 'AbortError' ||
76
- APICallError . isAPICallError ( error )
77
- ) {
78
- throw error ;
79
- }
74
+ if ( isAbortError ( error ) || APICallError . isAPICallError ( error ) ) {
75
+ throw error ;
80
76
}
81
77
82
78
throw new APICallError ( {
@@ -97,7 +93,7 @@ export const postToApi = async <T>({
97
93
} ) ;
98
94
} catch ( error ) {
99
95
if ( error instanceof Error ) {
100
- if ( error . name === 'AbortError' || APICallError . isAPICallError ( error ) ) {
96
+ if ( isAbortError ( error ) || APICallError . isAPICallError ( error ) ) {
101
97
throw error ;
102
98
}
103
99
}
@@ -111,10 +107,8 @@ export const postToApi = async <T>({
111
107
} ) ;
112
108
}
113
109
} catch ( error ) {
114
- if ( error instanceof Error ) {
115
- if ( error . name === 'AbortError' ) {
116
- throw error ;
117
- }
110
+ if ( isAbortError ( error ) ) {
111
+ throw error ;
118
112
}
119
113
120
114
// unwrap original error when fetch failed (for easier debugging):
You can’t perform that action at this time.
0 commit comments