@@ -7,80 +7,33 @@ import { JSONValue } from '../shared/types';
7
7
export class StreamData {
8
8
private encoder = new TextEncoder ( ) ;
9
9
10
- private controller : TransformStreamDefaultController < Uint8Array > | null =
11
- null ;
12
- public stream : TransformStream < Uint8Array , Uint8Array > ;
13
-
14
- // closing the stream is synchronous, but we want to return a promise
15
- // in case we're doing async work
16
- private isClosedPromise : Promise < void > | null = null ;
17
- private isClosedPromiseResolver : undefined | ( ( ) => void ) = undefined ;
18
- private isClosed : boolean = false ;
10
+ private controller : ReadableStreamController < Uint8Array > | null = null ;
11
+ public stream : ReadableStream < Uint8Array > ;
19
12
20
- // array to store appended data
21
- private data : JSONValue [ ] = [ ] ;
22
- private messageAnnotations : JSONValue [ ] = [ ] ;
13
+ private isClosed : boolean = false ;
14
+ private warningTimeout : NodeJS . Timeout | null = null ;
23
15
24
16
constructor ( ) {
25
- this . isClosedPromise = new Promise ( resolve => {
26
- this . isClosedPromiseResolver = resolve ;
27
- } ) ;
28
-
29
17
const self = this ;
30
- this . stream = new TransformStream ( {
18
+
19
+ this . stream = new ReadableStream ( {
31
20
start : async controller => {
32
21
self . controller = controller ;
33
- } ,
34
- transform : async ( chunk , controller ) => {
35
- // add buffered data to the stream
36
- if ( self . data . length > 0 ) {
37
- const encodedData = self . encoder . encode (
38
- formatStreamPart ( 'data' , self . data ) ,
39
- ) ;
40
- self . data = [ ] ;
41
- controller . enqueue ( encodedData ) ;
42
- }
43
22
44
- if ( self . messageAnnotations . length ) {
45
- const encodedMessageAnnotations = self . encoder . encode (
46
- formatStreamPart ( 'message_annotations' , self . messageAnnotations ) ,
47
- ) ;
48
- self . messageAnnotations = [ ] ;
49
- controller . enqueue ( encodedMessageAnnotations ) ;
23
+ // Set a timeout to show a warning if the stream is not closed within 3 seconds
24
+ if ( process . env . NODE_ENV === 'development' ) {
25
+ self . warningTimeout = setTimeout ( ( ) => {
26
+ console . warn (
27
+ 'The data stream is hanging. Did you forget to close it with `data.close()`?' ,
28
+ ) ;
29
+ } , 3000 ) ;
50
30
}
51
-
52
- controller . enqueue ( chunk ) ;
53
31
} ,
54
- async flush ( controller ) {
55
- // Show a warning during dev if the data stream is hanging after 3 seconds.
56
- const warningTimeout =
57
- process . env . NODE_ENV === 'development'
58
- ? setTimeout ( ( ) => {
59
- console . warn (
60
- 'The data stream is hanging. Did you forget to close it with `data.close()`?' ,
61
- ) ;
62
- } , 3000 )
63
- : null ;
64
-
65
- await self . isClosedPromise ;
66
-
67
- if ( warningTimeout !== null ) {
68
- clearTimeout ( warningTimeout ) ;
69
- }
70
-
71
- if ( self . data . length ) {
72
- const encodedData = self . encoder . encode (
73
- formatStreamPart ( 'data' , self . data ) ,
74
- ) ;
75
- controller . enqueue ( encodedData ) ;
76
- }
77
-
78
- if ( self . messageAnnotations . length ) {
79
- const encodedData = self . encoder . encode (
80
- formatStreamPart ( 'message_annotations' , self . messageAnnotations ) ,
81
- ) ;
82
- controller . enqueue ( encodedData ) ;
83
- }
32
+ pull : controller => {
33
+ // No-op: we don't need to do anything special on pull
34
+ } ,
35
+ cancel : reason => {
36
+ this . isClosed = true ;
84
37
} ,
85
38
} ) ;
86
39
}
@@ -94,24 +47,41 @@ export class StreamData {
94
47
throw new Error ( 'Stream controller is not initialized.' ) ;
95
48
}
96
49
97
- this . isClosedPromiseResolver ?. ( ) ;
50
+ this . controller . close ( ) ;
98
51
this . isClosed = true ;
52
+
53
+ // Clear the warning timeout if the stream is closed
54
+ if ( this . warningTimeout ) {
55
+ clearTimeout ( this . warningTimeout ) ;
56
+ }
99
57
}
100
58
101
59
append ( value : JSONValue ) : void {
102
60
if ( this . isClosed ) {
103
61
throw new Error ( 'Data Stream has already been closed.' ) ;
104
62
}
105
63
106
- this . data . push ( value ) ;
64
+ if ( ! this . controller ) {
65
+ throw new Error ( 'Stream controller is not initialized.' ) ;
66
+ }
67
+
68
+ this . controller . enqueue (
69
+ this . encoder . encode ( formatStreamPart ( 'data' , [ value ] ) ) ,
70
+ ) ;
107
71
}
108
72
109
73
appendMessageAnnotation ( value : JSONValue ) : void {
110
74
if ( this . isClosed ) {
111
75
throw new Error ( 'Data Stream has already been closed.' ) ;
112
76
}
113
77
114
- this . messageAnnotations . push ( value ) ;
78
+ if ( ! this . controller ) {
79
+ throw new Error ( 'Stream controller is not initialized.' ) ;
80
+ }
81
+
82
+ this . controller . enqueue (
83
+ this . encoder . encode ( formatStreamPart ( 'message_annotations' , [ value ] ) ) ,
84
+ ) ;
115
85
}
116
86
}
117
87
0 commit comments