Skip to content

Commit b86af09

Browse files
authoredJul 19, 2024··
feat (ai/core): add langchain stream event v2 support to LangChainAdapter (#2340)
1 parent ba33f32 commit b86af09

File tree

4 files changed

+73
-13
lines changed

4 files changed

+73
-13
lines changed
 

Diff for: ‎.changeset/tiny-plants-press.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': patch
3+
---
4+
5+
feat (ai/core): add langchain stream event v2 support to LangChainAdapter

Diff for: ‎content/docs/07-reference/stream-helpers/16-langchain-adapter.mdx

+6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ description: API Reference for LangChainAdapter.
88
The `LangChainAdapter` module provides a way to transform LangChain output streams into AI streams.
99
See the [LangChain Adapter documentation](/providers/adapters/langchain) for more information.
1010

11+
It supports:
12+
13+
- LangChain StringOutputParser streams
14+
- LangChain AIMessageChunk streams
15+
- LangChain StreamEvents v2 streams
16+
1117
## Import
1218

1319
<Snippet text={`import { LangChainAdapter } from "ai"`} prompt={false} />

Diff for: ‎packages/core/streams/langchain-adapter.test.ts

+14
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,18 @@ describe('toAIStream', () => {
2929
['0:"Hello"\n', '0:"World"\n'],
3030
);
3131
});
32+
33+
it('should convert ReadableStream<LangChainStreamEvent>', async () => {
34+
const inputStream = convertArrayToReadableStream([
35+
{ event: 'on_chat_model_stream', data: { chunk: { content: 'Hello' } } },
36+
{ event: 'on_chat_model_stream', data: { chunk: { content: 'World' } } },
37+
]);
38+
39+
assert.deepStrictEqual(
40+
await convertReadableStreamToArray(
41+
toAIStream(inputStream).pipeThrough(new TextDecoderStream()),
42+
),
43+
['0:"Hello"\n', '0:"World"\n'],
44+
);
45+
});
3246
});

Diff for: ‎packages/core/streams/langchain-adapter.ts

+48-13
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ type LangChainAIMessageChunk = {
3737
content: LangChainMessageContent;
3838
};
3939

40+
// LC stream event v2
41+
type LangChainStreamEvent = {
42+
event: string;
43+
data: any;
44+
};
45+
4046
/**
4147
Converts LangChain output streams to AIStream.
4248
@@ -45,28 +51,57 @@ The following streams are supported:
4551
- `string` streams (LangChain `StringOutputParser` output)
4652
*/
4753
export function toAIStream(
48-
stream: ReadableStream<LangChainAIMessageChunk> | ReadableStream<string>,
54+
stream:
55+
| ReadableStream<LangChainStreamEvent>
56+
| ReadableStream<LangChainAIMessageChunk>
57+
| ReadableStream<string>,
4958
callbacks?: AIStreamCallbacksAndOptions,
5059
) {
5160
return stream
5261
.pipeThrough(
53-
new TransformStream<LangChainAIMessageChunk | string>({
54-
transform: async (chunk, controller) => {
55-
if (typeof chunk === 'string') {
56-
controller.enqueue(chunk);
57-
} else if (typeof chunk.content === 'string') {
58-
controller.enqueue(chunk.content);
59-
} else {
60-
const content: LangChainMessageContentComplex[] = chunk.content;
61-
for (const item of content) {
62-
if (item.type === 'text') {
63-
controller.enqueue(item.text);
64-
}
62+
new TransformStream<
63+
LangChainStreamEvent | LangChainAIMessageChunk | string
64+
>({
65+
transform: async (value, controller) => {
66+
// text stream:
67+
if (typeof value === 'string') {
68+
controller.enqueue(value);
69+
return;
70+
}
71+
72+
// LC stream events v2:
73+
if ('event' in value) {
74+
// chunk is AIMessage Chunk for on_chat_model_stream event:
75+
if (value.event === 'on_chat_model_stream') {
76+
forwardAIMessageChunk(
77+
value.data?.chunk as LangChainAIMessageChunk,
78+
controller,
79+
);
6580
}
81+
return;
6682
}
83+
84+
// AI Message chunk stream:
85+
forwardAIMessageChunk(value, controller);
6786
},
6887
}),
6988
)
7089
.pipeThrough(createCallbacksTransformer(callbacks))
7190
.pipeThrough(createStreamDataTransformer());
7291
}
92+
93+
function forwardAIMessageChunk(
94+
chunk: LangChainAIMessageChunk,
95+
controller: TransformStreamDefaultController<any>,
96+
) {
97+
if (typeof chunk.content === 'string') {
98+
controller.enqueue(chunk.content);
99+
} else {
100+
const content: LangChainMessageContentComplex[] = chunk.content;
101+
for (const item of content) {
102+
if (item.type === 'text') {
103+
controller.enqueue(item.text);
104+
}
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)
Please sign in to comment.