@@ -37,6 +37,12 @@ type LangChainAIMessageChunk = {
37
37
content : LangChainMessageContent ;
38
38
} ;
39
39
40
+ // LC stream event v2
41
+ type LangChainStreamEvent = {
42
+ event : string ;
43
+ data : any ;
44
+ } ;
45
+
40
46
/**
41
47
Converts LangChain output streams to AIStream.
42
48
@@ -45,28 +51,57 @@ The following streams are supported:
45
51
- `string` streams (LangChain `StringOutputParser` output)
46
52
*/
47
53
export function toAIStream (
48
- stream : ReadableStream < LangChainAIMessageChunk > | ReadableStream < string > ,
54
+ stream :
55
+ | ReadableStream < LangChainStreamEvent >
56
+ | ReadableStream < LangChainAIMessageChunk >
57
+ | ReadableStream < string > ,
49
58
callbacks ?: AIStreamCallbacksAndOptions ,
50
59
) {
51
60
return stream
52
61
. pipeThrough (
53
- new TransformStream < LangChainAIMessageChunk | string > ( {
54
- transform : async ( chunk , controller ) => {
55
- if ( typeof chunk === 'string' ) {
56
- controller . enqueue ( chunk ) ;
57
- } else if ( typeof chunk . content === 'string' ) {
58
- controller . enqueue ( chunk . content ) ;
59
- } else {
60
- const content : LangChainMessageContentComplex [ ] = chunk . content ;
61
- for ( const item of content ) {
62
- if ( item . type === 'text' ) {
63
- controller . enqueue ( item . text ) ;
64
- }
62
+ new TransformStream <
63
+ LangChainStreamEvent | LangChainAIMessageChunk | string
64
+ > ( {
65
+ transform : async ( value , controller ) => {
66
+ // text stream:
67
+ if ( typeof value === 'string' ) {
68
+ controller . enqueue ( value ) ;
69
+ return ;
70
+ }
71
+
72
+ // LC stream events v2:
73
+ if ( 'event' in value ) {
74
+ // chunk is AIMessage Chunk for on_chat_model_stream event:
75
+ if ( value . event === 'on_chat_model_stream' ) {
76
+ forwardAIMessageChunk (
77
+ value . data ?. chunk as LangChainAIMessageChunk ,
78
+ controller ,
79
+ ) ;
65
80
}
81
+ return ;
66
82
}
83
+
84
+ // AI Message chunk stream:
85
+ forwardAIMessageChunk ( value , controller ) ;
67
86
} ,
68
87
} ) ,
69
88
)
70
89
. pipeThrough ( createCallbacksTransformer ( callbacks ) )
71
90
. pipeThrough ( createStreamDataTransformer ( ) ) ;
72
91
}
92
+
93
+ function forwardAIMessageChunk (
94
+ chunk : LangChainAIMessageChunk ,
95
+ controller : TransformStreamDefaultController < any > ,
96
+ ) {
97
+ if ( typeof chunk . content === 'string' ) {
98
+ controller . enqueue ( chunk . content ) ;
99
+ } else {
100
+ const content : LangChainMessageContentComplex [ ] = chunk . content ;
101
+ for ( const item of content ) {
102
+ if ( item . type === 'text' ) {
103
+ controller . enqueue ( item . text ) ;
104
+ }
105
+ }
106
+ }
107
+ }
0 commit comments