Skip to content

Commit 03abfce

Browse files
authoredFeb 29, 2024··
feat: Added instrumentation for chain.stream for langchain js. (#2052)
1 parent 4830ea3 commit 03abfce

File tree

6 files changed

+785
-44
lines changed

6 files changed

+785
-44
lines changed
 

‎lib/instrumentation/langchain/runnable.js

+190-38
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
'use strict'
77

88
const common = require('./common')
9-
const {
10-
AI: { LANGCHAIN }
11-
} = require('../../metrics/names')
9+
const { AI } = require('../../metrics/names')
10+
const { LANGCHAIN } = AI
1211
const {
1312
LangChainCompletionMessage,
1413
LangChainCompletionSummary
@@ -28,10 +27,33 @@ module.exports = function initialize(shim, langchain) {
2827
return
2928
}
3029

30+
instrumentInvokeChain({ langchain, shim })
31+
32+
if (agent.config.ai_monitoring.streaming.enabled) {
33+
instrumentStream({ langchain, shim })
34+
} else {
35+
shim.logger.warn(
36+
'`ai_monitoring.streaming.enabled` is set to `false`, stream will not be instrumented.'
37+
)
38+
agent.metrics.getOrCreateMetric(AI.STREAMING_DISABLED).incrementCallCount()
39+
agent.metrics
40+
.getOrCreateMetric(`${LANGCHAIN.TRACKING_PREFIX}/${pkgVersion}`)
41+
.incrementCallCount()
42+
}
43+
}
44+
45+
/**
46+
* Instruments and records span and relevant LLM events for `chain.invoke`
47+
*
48+
* @param {object} params function params
49+
* @param {object} params.langchain `@langchain/core/runnables/base` export
50+
* @param {Shim} params.shim instace of shim
51+
*/
52+
function instrumentInvokeChain({ langchain, shim }) {
3153
shim.record(
3254
langchain.RunnableSequence.prototype,
3355
'invoke',
34-
function wrapCall(shim, invoke, fnName, args) {
56+
function wrapCall(shim, _invoke, fnName, args) {
3557
const [request, params] = args
3658
const metadata = params?.metadata ?? {}
3759
const tags = params?.tags ?? []
@@ -41,53 +63,183 @@ module.exports = function initialize(shim, langchain) {
4163
promise: true,
4264
// eslint-disable-next-line max-params
4365
after(_shim, _fn, _name, err, output, segment) {
44-
segment.end()
45-
const completionSummary = new LangChainCompletionSummary({
46-
agent,
47-
messages: [{ output }],
48-
metadata,
49-
tags,
66+
recordChatCompletionEvents({
5067
segment,
51-
error: err != null,
52-
runId: segment[langchainRunId]
53-
})
54-
55-
common.recordEvent({
56-
agent,
57-
type: 'LlmChatCompletionSummary',
58-
pkgVersion,
59-
msg: completionSummary
60-
})
61-
62-
// output can be BaseMessage with a content property https://js.langchain.com/docs/modules/model_io/concepts#messages
63-
// or an output parser https://js.langchain.com/docs/modules/model_io/concepts#output-parsers
64-
recordCompletions({
68+
messages: [output],
6569
events: [request, output],
66-
completionSummary,
67-
agent,
68-
segment,
70+
metadata,
71+
tags,
72+
err,
6973
shim
7074
})
75+
}
76+
})
77+
}
78+
)
79+
}
80+
81+
/**
82+
* Instruments and records span and relevant LLM events for `chain.stream`
83+
*
84+
* @param {object} params function params
85+
* @param {object} params.langchain `@langchain/core/runnables/base` export
86+
* @param {Shim} params.shim instace of shim
87+
*/
88+
function instrumentStream({ langchain, shim }) {
89+
shim.record(
90+
langchain.RunnableSequence.prototype,
91+
'stream',
92+
function wrapStream(shim, _stream, fnName, args) {
93+
const [request, params] = args
94+
const metadata = params?.metadata ?? {}
95+
const tags = params?.tags ?? []
7196

72-
if (err) {
73-
agent.errors.add(
74-
segment.transaction,
97+
return new RecorderSpec({
98+
name: `${LANGCHAIN.CHAIN}/${fnName}`,
99+
promise: true,
100+
// eslint-disable-next-line max-params
101+
after(_shim, _fn, _name, err, output, segment) {
102+
// Input error occurred which means a stream was not created.
103+
// Skip instrumenting streaming and create Llm Events from
104+
// the data we have
105+
if (output?.next) {
106+
wrapNextHandler({ shim, output, segment, request, metadata, tags })
107+
} else {
108+
recordChatCompletionEvents({
109+
segment,
110+
messages: [],
111+
events: [request],
112+
metadata,
113+
tags,
75114
err,
76-
new LlmErrorMessage({
77-
response: {},
78-
cause: err,
79-
summary: completionSummary
80-
})
81-
)
115+
shim
116+
})
82117
}
83-
84-
segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
85118
}
86119
})
87120
}
88121
)
89122
}
90123

124+
/**
125+
* Wraps the next method on the IterableReadableStream. It will also record the Llm
126+
* events when the stream is done processing.
127+
*
128+
* @param {object} params function params
129+
* @param {Shim} params.shim shim instance
130+
* @param {TraceSegment} params.segment active segment
131+
* @param {function} params.output IterableReadableStream
132+
* @param {string} params.request the prompt message
133+
* @param {object} params.metadata metadata for the call
134+
* @param {Array} params.tags tags for the call
135+
*/
136+
function wrapNextHandler({ shim, output, segment, request, metadata, tags }) {
137+
shim.wrap(output, 'next', function wrapIterator(shim, orig) {
138+
let content = ''
139+
return async function wrappedIterator() {
140+
try {
141+
const result = await orig.apply(this, arguments)
142+
// only create Llm events when stream iteration is done
143+
if (result?.done) {
144+
recordChatCompletionEvents({
145+
segment,
146+
messages: [content],
147+
events: [request, content],
148+
metadata,
149+
tags,
150+
shim
151+
})
152+
} else {
153+
content += result.value
154+
}
155+
return result
156+
} catch (error) {
157+
recordChatCompletionEvents({
158+
segment,
159+
messages: [content],
160+
events: [request, content],
161+
metadata,
162+
tags,
163+
err: error,
164+
shim
165+
})
166+
throw error
167+
} finally {
168+
// update segment duration on every stream iteration to extend
169+
// the timer
170+
segment.touch()
171+
}
172+
}
173+
})
174+
}
175+
176+
/**
177+
* Ends active segment, creates LlmChatCompletionSummary, and LlmChatCompletionMessage(s), and handles errors if they exists
178+
*
179+
* @param {object} params function params
180+
* @param {TraceSegment} params.segment active segment
181+
* @param {Array} params.messages response messages
182+
* @param {Array} params.events prompt and response messages
183+
* @param {object} params.metadata metadata for the call
184+
* @param {Array} params.tags tags for the call
185+
* @param {Error} params.err error object from call
186+
* @param {Shim} params.shim shim instance
187+
*/
188+
function recordChatCompletionEvents({ segment, messages, events, metadata, tags, err, shim }) {
189+
const { pkgVersion, agent } = shim
190+
segment.end()
191+
const completionSummary = new LangChainCompletionSummary({
192+
agent,
193+
messages,
194+
metadata,
195+
tags,
196+
segment,
197+
error: err != null,
198+
runId: segment[langchainRunId]
199+
})
200+
201+
common.recordEvent({
202+
agent,
203+
type: 'LlmChatCompletionSummary',
204+
pkgVersion,
205+
msg: completionSummary
206+
})
207+
208+
// output can be BaseMessage with a content property https://js.langchain.com/docs/modules/model_io/concepts#messages
209+
// or an output parser https://js.langchain.com/docs/modules/model_io/concepts#output-parsers
210+
recordCompletions({
211+
events,
212+
completionSummary,
213+
agent,
214+
segment,
215+
shim
216+
})
217+
218+
if (err) {
219+
agent.errors.add(
220+
segment.transaction,
221+
err,
222+
new LlmErrorMessage({
223+
response: {},
224+
cause: err,
225+
summary: completionSummary
226+
})
227+
)
228+
}
229+
230+
segment.transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
231+
}
232+
233+
/**
234+
* Records the LlmChatCompletionMessage(s)
235+
*
236+
* @param {object} params function params
237+
* @param {Array} params.events prompt and response messages
238+
* @param {LangChainCompletionSummary} params.completionSummary LlmChatCompletionSummary event
239+
* @param {Agent} params.agent instance of agent
240+
* @param {TraceSegment} params.segment active segment
241+
* @param {Shim} params.shim shim instance
242+
*/
91243
function recordCompletions({ events, completionSummary, agent, segment, shim }) {
92244
for (let i = 0; i < events.length; i += 1) {
93245
let msg = events[i]

‎lib/llm-events/langchain/event.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ class LangChainEvent extends BaseEvent {
5151
ingest_source = 'Node'
5252
vendor = 'langchain'
5353
virtual_llm = true
54-
error = false
5554

5655
constructor(params = defaultParams) {
5756
params = Object.assign({}, defaultParams, params)
@@ -66,7 +65,7 @@ class LangChainEvent extends BaseEvent {
6665
this.langchainMeta = params.metadata
6766
this.metadata = agent
6867
this.tags = Array.isArray(params.tags) ? params.tags.join(',') : params.tags
69-
this.error = params.error ?? false
68+
this.error = params.error ?? null
7069

7170
if (params.virtual !== undefined) {
7271
if (params.virtual !== true && params.virtual !== false) {

‎test/unit/llm-events/langchain/event.test.js

+7
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ tap.test('constructs default instance', async (t) => {
6262
['metadata.foo']: 'foo',
6363
ingest_source: 'Node',
6464
vendor: 'langchain',
65+
error: null,
6566
virtual_llm: true
6667
})
6768
})
@@ -103,3 +104,9 @@ tap.test('sets tags from string', async (t) => {
103104
const msg = new LangChainEvent(t.context)
104105
t.equal(msg.tags, 'foo,bar')
105106
})
107+
108+
tap.test('sets error property', async (t) => {
109+
t.context.error = true
110+
const msg = new LangChainEvent(t.context)
111+
t.equal(msg.error, true)
112+
})

‎test/versioned/langchain/common.js

+10-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,14 @@ function assertLangChainChatCompletionSummary({ tx, chatSummary, withCallback })
4848
this.match(chatSummary[1], expectedSummary, 'should match chat summary message')
4949
}
5050

51-
function assertLangChainChatCompletionMessages({ tx, chatMsgs, chatSummary, withCallback }) {
51+
function assertLangChainChatCompletionMessages({
52+
tx,
53+
chatMsgs,
54+
chatSummary,
55+
withCallback,
56+
input = '{"topic":"scientist"}',
57+
output = '212 degrees Fahrenheit is equal to 100 degrees Celsius.'
58+
}) {
5259
const baseMsg = {
5360
id: /[a-f0-9]{36}/,
5461
appName: 'New Relic for Node.js tests',
@@ -71,11 +78,11 @@ function assertLangChainChatCompletionMessages({ tx, chatMsgs, chatSummary, with
7178
const expectedChatMsg = { ...baseMsg }
7279
if (msg[1].sequence === 0) {
7380
expectedChatMsg.sequence = 0
74-
expectedChatMsg.content = '{"topic":"scientist"}'
81+
expectedChatMsg.content = input
7582
expectedChatMsg.is_response = false
7683
} else if (msg[1].sequence === 1) {
7784
expectedChatMsg.sequence = 1
78-
expectedChatMsg.content = '212 degrees Fahrenheit is equal to 100 degrees Celsius.'
85+
expectedChatMsg.content = output
7986
expectedChatMsg.is_response = true
8087
}
8188

‎test/versioned/langchain/package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
},
1717
"files": [
1818
"tools.tap.js",
19-
"runnables.tap.js"
19+
"runnables.tap.js",
20+
"runnables-streaming.tap.js"
2021
]
2122
}
2223
]

‎test/versioned/langchain/runnables-streaming.tap.js

+575
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)
Please sign in to comment.