diff --git a/plugins/node/opentelemetry-instrumentation-pg/README.md b/plugins/node/opentelemetry-instrumentation-pg/README.md index ac59f6823c..f764adb8b7 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/README.md +++ b/plugins/node/opentelemetry-instrumentation-pg/README.md @@ -46,6 +46,7 @@ PostgreSQL instrumentation has few options available to choose from. You can set | Options | Type | Description | | ------- | ---- | ----------- | | [`enhancedDatabaseReporting`](https://github.com/open-telemetry/opentelemetry-js-contrib/blob/main/plugins/node/opentelemetry-instrumentation-pg/src/pg.ts#L48) | `boolean` | If true, additional information about query parameters and results will be attached (as `attributes`) to spans representing database operations | +| `responseHook` | `PgInstrumentationExecutionResponseHook` (function) | Function for adding custom attributes from db response | ## Supported Versions diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/index.ts b/plugins/node/opentelemetry-instrumentation-pg/src/index.ts index 24c76056a1..ae4b1c23b3 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/index.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/index.ts @@ -15,3 +15,8 @@ */ export * from './instrumentation'; +export { + PgInstrumentationConfig, + PgInstrumentationExecutionResponseHook, + PgResponseHookInformation, +} from './types'; diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts index 052aa432e1..7260e4fb2c 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts @@ -16,7 +16,6 @@ import { isWrapped, InstrumentationBase, - InstrumentationConfig, InstrumentationNodeModuleDefinition, } from '@opentelemetry/instrumentation'; @@ -36,6 +35,7 @@ import { PostgresCallback, PgPoolExtended, PgPoolCallback, + PgInstrumentationConfig, } from './types'; import * as utils from './utils'; import { AttributeNames } from './enums/AttributeNames'; @@ -45,17 +45,7 @@ import { } from '@opentelemetry/semantic-conventions'; import { VERSION } from './version'; -export interface PgInstrumentationConfig extends InstrumentationConfig { - /** - * If true, additional information about query parameters and - * results will be attached (as `attributes`) to spans representing - * database operations. - */ - enhancedDatabaseReporting?: boolean; -} - const PG_POOL_COMPONENT = 'pg-pool'; - export class PgInstrumentation extends InstrumentationBase { static readonly COMPONENT = 'pg'; @@ -134,7 +124,7 @@ export class PgInstrumentation extends InstrumentationBase { span = utils.handleParameterizedQuery.call( this, plugin.tracer, - plugin._config as InstrumentationConfig & PgInstrumentationConfig, + plugin.getConfig() as PgInstrumentationConfig, query, params ); @@ -146,7 +136,7 @@ export class PgInstrumentation extends InstrumentationBase { span = utils.handleConfigQuery.call( this, plugin.tracer, - plugin._config as InstrumentationConfig & PgInstrumentationConfig, + plugin.getConfig() as PgInstrumentationConfig, queryConfig ); } else { @@ -164,6 +154,7 @@ export class PgInstrumentation extends InstrumentationBase { if (typeof args[args.length - 1] === 'function') { // Patch ParameterQuery callback args[args.length - 1] = utils.patchCallback( + plugin.getConfig() as PgInstrumentationConfig, span, args[args.length - 1] as PostgresCallback ); @@ -179,6 +170,7 @@ export class PgInstrumentation extends InstrumentationBase { ) { // Patch ConfigQuery callback let callback = utils.patchCallback( + plugin.getConfig() as PgInstrumentationConfig, span, (args[0] as NormalizedQueryConfig).callback! ); @@ -202,6 +194,11 @@ export class PgInstrumentation extends InstrumentationBase { .then((result: unknown) => { // Return a pass-along promise which ends the span and then goes to user's orig resolvers return new Promise(resolve => { + utils.handleExecutionResult( + plugin.getConfig() as PgInstrumentationConfig, + span, + result + ); span.end(); resolve(result); }); diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/types.ts b/plugins/node/opentelemetry-instrumentation-pg/src/types.ts index 35a7b1e292..8728dc1028 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/types.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/types.ts @@ -16,6 +16,31 @@ import * as pgTypes from 'pg'; import * as pgPoolTypes from 'pg-pool'; +import type * as api from '@opentelemetry/api'; +import { InstrumentationConfig } from '@opentelemetry/instrumentation'; + +export interface PgResponseHookInformation { + data: pgTypes.QueryResult | pgTypes.QueryArrayResult; +} + +export interface PgInstrumentationExecutionResponseHook { + (span: api.Span, responseInfo: PgResponseHookInformation): void; +} + +export interface PgInstrumentationConfig extends InstrumentationConfig { + /** + * If true, additional information about query parameters will be attached (as `attributes`) to spans representing + */ + enhancedDatabaseReporting?: boolean; + + /** + * Hook that allows adding custom span attributes based on the data + * returned from "query" Pg actions. + * + * @default undefined + */ + responseHook?: PgInstrumentationExecutionResponseHook; +} export type PostgresCallback = (err: Error, res: object) => unknown; diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts b/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts index ca0e5d4377..87efaf6400 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts @@ -14,7 +14,13 @@ * limitations under the License. */ -import { Span, SpanStatusCode, Tracer, SpanKind } from '@opentelemetry/api'; +import { + Span, + SpanStatusCode, + Tracer, + SpanKind, + diag, +} from '@opentelemetry/api'; import { AttributeNames } from './enums/AttributeNames'; import { SemanticAttributes, @@ -27,9 +33,11 @@ import { PgClientConnectionParams, PgPoolCallback, PgPoolExtended, + PgInstrumentationConfig, } from './types'; import * as pgTypes from 'pg'; -import { PgInstrumentation, PgInstrumentationConfig } from './'; +import { PgInstrumentation } from './'; +import { safeExecuteInTheMiddle } from '@opentelemetry/instrumentation'; function arrayStringifyHelper(arr: Array): string { return '[' + arr.toString() + ']'; @@ -161,7 +169,30 @@ export function handleInvalidQuery( return result; } +export function handleExecutionResult( + config: PgInstrumentationConfig, + span: Span, + pgResult: pgTypes.QueryResult | pgTypes.QueryArrayResult | unknown +) { + if (typeof config.responseHook === 'function') { + safeExecuteInTheMiddle( + () => { + config.responseHook!(span, { + data: pgResult as pgTypes.QueryResult | pgTypes.QueryArrayResult, + }); + }, + err => { + if (err) { + diag.error('Error running response hook', err); + } + }, + true + ); + } +} + export function patchCallback( + instrumentationConfig: PgInstrumentationConfig, span: Span, cb: PostgresCallback ): PostgresCallback { @@ -176,7 +207,10 @@ export function patchCallback( code: SpanStatusCode.ERROR, message: err.message, }); + } else { + handleExecutionResult(instrumentationConfig, span, res); } + span.end(); cb.call(this, err, res); }; diff --git a/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts b/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts index 09fa2720ea..4eadb54f03 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts @@ -24,7 +24,11 @@ import { trace, } from '@opentelemetry/api'; import { BasicTracerProvider } from '@opentelemetry/tracing'; -import { PgInstrumentation } from '../src'; +import { + PgInstrumentation, + PgInstrumentationConfig, + PgResponseHookInformation, +} from '../src'; import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; import * as testUtils from '@opentelemetry/test-utils'; import { @@ -95,6 +99,11 @@ const runCallbackTest = ( }; describe('pg-pool@2.x', () => { + function create(config: PgInstrumentationConfig = {}) { + instrumentation.setConfig(config); + instrumentation.enable(); + } + let pool: pgPool; let contextManager: AsyncHooksContextManager; let instrumentation: PgInstrumentation; @@ -130,6 +139,7 @@ describe('pg-pool@2.x', () => { if (testPostgresLocally) { testUtils.cleanUpDocker('postgres'); } + pool.end(() => { done(); }); @@ -288,5 +298,154 @@ describe('pg-pool@2.x', () => { assert.strictEqual(resNoPromise, undefined, 'No promise is returned'); }); }); + + describe('when specifying a responseHook configuration', () => { + const dataAttributeName = 'pg_data'; + const query = 'SELECT 0::text'; + const events: TimedEvent[] = []; + + describe('AND valid responseHook', () => { + const pgPoolattributes = { + ...DEFAULT_PGPOOL_ATTRIBUTES, + }; + const pgAttributes = { + ...DEFAULT_PG_ATTRIBUTES, + [SemanticAttributes.DB_STATEMENT]: query, + [dataAttributeName]: '{"rowCount":1}', + }; + + beforeEach(async () => { + const config: PgInstrumentationConfig = { + enhancedDatabaseReporting: true, + responseHook: ( + span: Span, + responseInfo: PgResponseHookInformation + ) => + span.setAttribute( + dataAttributeName, + JSON.stringify({ rowCount: responseInfo?.data.rowCount }) + ), + }; + + create(config); + }); + + it('should attach response hook data to resulting spans for query with callback ', done => { + const parentSpan = provider + .getTracer('test-pg-pool') + .startSpan('test span'); + context.with(trace.setSpan(context.active(), parentSpan), () => { + const resNoPromise = pool.query(query, (err, result) => { + if (err) { + return done(err); + } + runCallbackTest( + parentSpan, + pgPoolattributes, + events, + unsetStatus, + 2, + 0 + ); + runCallbackTest( + parentSpan, + pgAttributes, + events, + unsetStatus, + 2, + 1 + ); + done(); + }); + assert.strictEqual( + resNoPromise, + undefined, + 'No promise is returned' + ); + }); + }); + + it('should attach response hook data to resulting spans for query returning a Promise', async () => { + const span = provider + .getTracer('test-pg-pool') + .startSpan('test span'); + await context.with( + trace.setSpan(context.active(), span), + async () => { + const result = await pool.query(query); + runCallbackTest( + span, + pgPoolattributes, + events, + unsetStatus, + 2, + 0 + ); + runCallbackTest(span, pgAttributes, events, unsetStatus, 2, 1); + assert.ok(result, 'pool.query() returns a promise'); + } + ); + }); + }); + + describe('AND invalid responseHook', () => { + const pgPoolattributes = { + ...DEFAULT_PGPOOL_ATTRIBUTES, + }; + const pgAttributes = { + ...DEFAULT_PG_ATTRIBUTES, + [SemanticAttributes.DB_STATEMENT]: query, + }; + + beforeEach(async () => { + create({ + enhancedDatabaseReporting: true, + responseHook: ( + span: Span, + responseInfo: PgResponseHookInformation + ) => { + throw 'some kind of failure!'; + }, + }); + }); + + it('should not do any harm when throwing an exception', done => { + const parentSpan = provider + .getTracer('test-pg-pool') + .startSpan('test span'); + context.with(trace.setSpan(context.active(), parentSpan), () => { + const resNoPromise = pool.query(query, (err, result) => { + if (err) { + return done(err); + } + assert.ok(result); + + runCallbackTest( + parentSpan, + pgPoolattributes, + events, + unsetStatus, + 2, + 0 + ); + runCallbackTest( + parentSpan, + pgAttributes, + events, + unsetStatus, + 2, + 1 + ); + done(); + }); + assert.strictEqual( + resNoPromise, + undefined, + 'No promise is returned' + ); + }); + }); + }); + }); }); }); diff --git a/plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts b/plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts index 5089cfbd67..250d4ca181 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts @@ -32,7 +32,11 @@ import { } from '@opentelemetry/tracing'; import * as assert from 'assert'; import type * as pg from 'pg'; -import { PgInstrumentation } from '../src'; +import { + PgInstrumentation, + PgInstrumentationConfig, + PgResponseHookInformation, +} from '../src'; import { AttributeNames } from '../src/enums/AttributeNames'; import { TimedEvent } from './types'; import { @@ -86,11 +90,17 @@ const runCallbackTest = ( }; describe('pg@7.x', () => { + function create(config: PgInstrumentationConfig = {}) { + instrumentation.setConfig(config); + instrumentation.enable(); + } + let client: pg.Client; let instrumentation: PgInstrumentation; let contextManager: AsyncHooksContextManager; const provider = new BasicTracerProvider(); const tracer = provider.getTracer('external'); + const testPostgres = process.env.RUN_POSTGRES_TESTS; // For CI: assumes local postgres db is already available const testPostgresLocally = process.env.RUN_POSTGRES_TESTS_LOCAL; // For local: spins up local postgres db via docker const shouldTest = testPostgres || testPostgresLocally; // Skips these tests if false (default) @@ -187,7 +197,7 @@ describe('pg@7.x', () => { assert.strictEqual(res, undefined, 'No promise is returned'); }); - it('should return a promise if callback is provided', done => { + it('should return a promise if callback is not provided', done => { const resPromise = client.query('SELECT NOW()'); resPromise .then(res => { @@ -367,6 +377,103 @@ describe('pg@7.x', () => { }); }); + describe('when specifying a responseHook configuration', () => { + const dataAttributeName = 'pg_data'; + const query = 'SELECT 0::text'; + const events: TimedEvent[] = []; + + describe('AND valid responseHook', () => { + const attributes = { + ...DEFAULT_ATTRIBUTES, + [SemanticAttributes.DB_STATEMENT]: query, + [dataAttributeName]: '{"rowCount":1}', + }; + beforeEach(async () => { + const config: PgInstrumentationConfig = { + enhancedDatabaseReporting: true, + responseHook: ( + span: Span, + responseInfo: PgResponseHookInformation + ) => + span.setAttribute( + dataAttributeName, + JSON.stringify({ rowCount: responseInfo?.data.rowCount }) + ), + }; + create(config); + }); + + it('should attach response hook data to resulting spans for query with callback ', done => { + const span = tracer.startSpan('test span'); + context.with(trace.setSpan(context.active(), span), () => { + const res = client.query(query, (err, res) => { + assert.strictEqual(err, null); + assert.ok(res); + runCallbackTest(span, attributes, events); + done(); + }); + assert.strictEqual(res, undefined, 'No promise is returned'); + }); + }); + + it('should attach response hook data to resulting spans for query returning a Promise', async () => { + const attributes = { + ...DEFAULT_ATTRIBUTES, + [SemanticAttributes.DB_STATEMENT]: query, + [dataAttributeName]: '{"rowCount":1}', + }; + + const span = tracer.startSpan('test span'); + await context.with( + trace.setSpan(context.active(), span), + async () => { + const resPromise = await client.query({ + text: query, + }); + try { + assert.ok(resPromise); + runCallbackTest(span, attributes, events); + } catch (e) { + assert.ok(false, e.message); + } + } + ); + }); + }); + + describe('AND invalid responseHook', () => { + const attributes = { + ...DEFAULT_ATTRIBUTES, + [SemanticAttributes.DB_STATEMENT]: query, + }; + + beforeEach(async () => { + create({ + enhancedDatabaseReporting: true, + responseHook: ( + span: Span, + responseInfo: PgResponseHookInformation + ) => { + throw 'some kind of failure!'; + }, + }); + }); + + it('should not do any harm when throwing an exception', done => { + const span = tracer.startSpan('test span'); + context.with(trace.setSpan(context.active(), span), () => { + const res = client.query(query, (err, res) => { + assert.strictEqual(err, null); + assert.ok(res); + runCallbackTest(span, attributes, events); + done(); + }); + assert.strictEqual(res, undefined, 'No promise is returned'); + }); + }); + }); + }); + it('should handle the same callback being given to multiple client.query()s', done => { let events = 0; const parent = tracer.startSpan('parent');