Skip to content

Commit

Permalink
feat: postgresql responseHook support
Browse files Browse the repository at this point in the history
  • Loading branch information
nata7che committed Jun 13, 2021
1 parent 676dbbc commit 3f8cdb0
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 17 deletions.
1 change: 1 addition & 0 deletions plugins/node/opentelemetry-instrumentation-pg/README.md
Expand Up @@ -46,6 +46,7 @@ PostgreSQL instrumentation has few options available to choose from. You can set
| Options | Type | Description |
| ------- | ---- | ----------- |
| [`enhancedDatabaseReporting`](https://github.com/open-telemetry/opentelemetry-js-contrib/blob/main/plugins/node/opentelemetry-instrumentation-pg/src/pg.ts#L48) | `boolean` | If true, additional information about query parameters and results will be attached (as `attributes`) to spans representing database operations |
| `responseHook` | `PgInstrumentationExecutionResponseHook` (function) | Function for adding custom attributes from db response |

## Supported Versions

Expand Down
5 changes: 5 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/src/index.ts
Expand Up @@ -15,3 +15,8 @@
*/

export * from './instrumentation';
export {
PgInstrumentationConfig,
PgInstrumentationExecutionResponseHook,
PgResponseHookInformation,
} from './types';
Expand Up @@ -36,6 +36,7 @@ import {
PostgresCallback,
PgPoolExtended,
PgPoolCallback,
PgInstrumentationConfig,
} from './types';
import * as utils from './utils';
import { AttributeNames } from './enums/AttributeNames';
Expand All @@ -45,17 +46,7 @@ import {
} from '@opentelemetry/semantic-conventions';
import { VERSION } from './version';

export interface PgInstrumentationConfig extends InstrumentationConfig {
/**
* If true, additional information about query parameters and
* results will be attached (as `attributes`) to spans representing
* database operations.
*/
enhancedDatabaseReporting?: boolean;
}

const PG_POOL_COMPONENT = 'pg-pool';

export class PgInstrumentation extends InstrumentationBase {
static readonly COMPONENT = 'pg';

Expand Down Expand Up @@ -134,7 +125,8 @@ export class PgInstrumentation extends InstrumentationBase {
span = utils.handleParameterizedQuery.call(
this,
plugin.tracer,
plugin._config as InstrumentationConfig & PgInstrumentationConfig,
plugin.getConfig() as PgInstrumentationConfig &
InstrumentationConfig,
query,
params
);
Expand All @@ -146,7 +138,8 @@ export class PgInstrumentation extends InstrumentationBase {
span = utils.handleConfigQuery.call(
this,
plugin.tracer,
plugin._config as InstrumentationConfig & PgInstrumentationConfig,
plugin.getConfig() as PgInstrumentationConfig &
InstrumentationConfig,
queryConfig
);
} else {
Expand All @@ -164,6 +157,8 @@ export class PgInstrumentation extends InstrumentationBase {
if (typeof args[args.length - 1] === 'function') {
// Patch ParameterQuery callback
args[args.length - 1] = utils.patchCallback(
plugin.getConfig() as PgInstrumentationConfig &
InstrumentationConfig,
span,
args[args.length - 1] as PostgresCallback
);
Expand All @@ -179,6 +174,8 @@ export class PgInstrumentation extends InstrumentationBase {
) {
// Patch ConfigQuery callback
let callback = utils.patchCallback(
plugin.getConfig() as PgInstrumentationConfig &
InstrumentationConfig,
span,
(args[0] as NormalizedQueryConfig).callback!
);
Expand All @@ -202,6 +199,12 @@ export class PgInstrumentation extends InstrumentationBase {
.then((result: unknown) => {
// Return a pass-along promise which ends the span and then goes to user's orig resolvers
return new Promise(resolve => {
utils.handleExecutionResult(
plugin.getConfig() as PgInstrumentationConfig &
InstrumentationConfig,
span,
result
);
span.end();
resolve(result);
});
Expand Down
25 changes: 25 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/src/types.ts
Expand Up @@ -16,6 +16,31 @@

import * as pgTypes from 'pg';
import * as pgPoolTypes from 'pg-pool';
import type * as api from '@opentelemetry/api';
import { InstrumentationConfig } from '@opentelemetry/instrumentation';

export interface PgResponseHookInformation {
data: pgTypes.QueryResult | pgTypes.QueryArrayResult;
}

export interface PgInstrumentationExecutionResponseHook {
(span: api.Span, responseInfo: PgResponseHookInformation): void;
}

export interface PgInstrumentationConfig extends InstrumentationConfig {
/**
* If true, additional information about query parameters will be attached (as `attributes`) to spans representing
*/
enhancedDatabaseReporting?: boolean;

/**
* Hook that allows adding custom span attributes based on the data
* returned from "query" Pg actions.
*
* @default undefined
*/
responseHook?: PgInstrumentationExecutionResponseHook;
}

export type PostgresCallback = (err: Error, res: object) => unknown;

Expand Down
38 changes: 36 additions & 2 deletions plugins/node/opentelemetry-instrumentation-pg/src/utils.ts
Expand Up @@ -14,7 +14,13 @@
* limitations under the License.
*/

import { Span, SpanStatusCode, Tracer, SpanKind } from '@opentelemetry/api';
import {
Span,
SpanStatusCode,
Tracer,
SpanKind,
diag,
} from '@opentelemetry/api';
import { AttributeNames } from './enums/AttributeNames';
import {
SemanticAttributes,
Expand All @@ -27,9 +33,11 @@ import {
PgClientConnectionParams,
PgPoolCallback,
PgPoolExtended,
PgInstrumentationConfig,
} from './types';
import * as pgTypes from 'pg';
import { PgInstrumentation, PgInstrumentationConfig } from './';
import { PgInstrumentation } from './';
import { safeExecuteInTheMiddle } from '@opentelemetry/instrumentation';

function arrayStringifyHelper(arr: Array<unknown>): string {
return '[' + arr.toString() + ']';
Expand Down Expand Up @@ -161,7 +169,30 @@ export function handleInvalidQuery(
return result;
}

export function handleExecutionResult(
config: PgInstrumentationConfig,
span: Span,
pgResult: pgTypes.QueryResult | pgTypes.QueryArrayResult | unknown
) {
if (config.responseHook !== undefined && pgResult !== undefined) {
safeExecuteInTheMiddle(
() => {
config.responseHook!(span, {
data: pgResult as pgTypes.QueryResult | pgTypes.QueryArrayResult,
});
},
err => {
if (err) {
diag.error('Error running response hook', err);
}
},
true
);
}
}

export function patchCallback(
instrumentationConfig: PgInstrumentationConfig,
span: Span,
cb: PostgresCallback
): PostgresCallback {
Expand All @@ -176,7 +207,10 @@ export function patchCallback(
code: SpanStatusCode.ERROR,
message: err.message,
});
} else {
handleExecutionResult(instrumentationConfig, span, res);
}

span.end();
cb.call(this, err, res);
};
Expand Down
161 changes: 160 additions & 1 deletion plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts
Expand Up @@ -24,7 +24,11 @@ import {
trace,
} from '@opentelemetry/api';
import { BasicTracerProvider } from '@opentelemetry/tracing';
import { PgInstrumentation } from '../src';
import {
PgInstrumentation,
PgInstrumentationConfig,
PgResponseHookInformation,
} from '../src';
import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks';
import * as testUtils from '@opentelemetry/test-utils';
import {
Expand Down Expand Up @@ -95,6 +99,11 @@ const runCallbackTest = (
};

describe('pg-pool@2.x', () => {
function create(config: PgInstrumentationConfig = {}) {
instrumentation.setConfig(config);
instrumentation.enable();
}

let pool: pgPool<pg.Client>;
let contextManager: AsyncHooksContextManager;
let instrumentation: PgInstrumentation;
Expand Down Expand Up @@ -130,6 +139,7 @@ describe('pg-pool@2.x', () => {
if (testPostgresLocally) {
testUtils.cleanUpDocker('postgres');
}

pool.end(() => {
done();
});
Expand Down Expand Up @@ -288,5 +298,154 @@ describe('pg-pool@2.x', () => {
assert.strictEqual(resNoPromise, undefined, 'No promise is returned');
});
});

describe('when specifying a responseHook configuration', () => {
const dataAttributeName = 'pg_data';
const query = 'SELECT 0::text';
const events: TimedEvent[] = [];

describe('AND valid responseHook', () => {
const pgPoolattributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
...DEFAULT_PG_ATTRIBUTES,
[SemanticAttributes.DB_STATEMENT]: query,
[dataAttributeName]: '{"rowCount":1}',
};

beforeEach(async () => {
const config: PgInstrumentationConfig = {
enhancedDatabaseReporting: true,
responseHook: (
span: Span,
responseInfo: PgResponseHookInformation
) =>
span.setAttribute(
dataAttributeName,
JSON.stringify({ rowCount: responseInfo?.data.rowCount })
),
};

create(config);
});

it('should attach response hook data to resulting spans for query with callback ', done => {
const parentSpan = provider
.getTracer('test-pg-pool')
.startSpan('test span');
context.with(trace.setSpan(context.active(), parentSpan), () => {
const resNoPromise = pool.query(query, (err, result) => {
if (err) {
return done(err);
}
runCallbackTest(
parentSpan,
pgPoolattributes,
events,
unsetStatus,
2,
0
);
runCallbackTest(
parentSpan,
pgAttributes,
events,
unsetStatus,
2,
1
);
done();
});
assert.strictEqual(
resNoPromise,
undefined,
'No promise is returned'
);
});
});

it('should attach response hook data to resulting spans for query returning a Promise', async () => {
const span = provider
.getTracer('test-pg-pool')
.startSpan('test span');
await context.with(
trace.setSpan(context.active(), span),
async () => {
const result = await pool.query(query);
runCallbackTest(
span,
pgPoolattributes,
events,
unsetStatus,
2,
0
);
runCallbackTest(span, pgAttributes, events, unsetStatus, 2, 1);
assert.ok(result, 'pool.query() returns a promise');
}
);
});
});

describe('AND invalid responseHook', () => {
const pgPoolattributes = {
...DEFAULT_PGPOOL_ATTRIBUTES,
};
const pgAttributes = {
...DEFAULT_PG_ATTRIBUTES,
[SemanticAttributes.DB_STATEMENT]: query,
};

beforeEach(async () => {
create({
enhancedDatabaseReporting: true,
responseHook: (
span: Span,
responseInfo: PgResponseHookInformation
) => {
throw 'some kind of failure!';
},
});
});

it('should not do any harm when throwing an exception', done => {
const parentSpan = provider
.getTracer('test-pg-pool')
.startSpan('test span');
context.with(trace.setSpan(context.active(), parentSpan), () => {
const resNoPromise = pool.query(query, (err, result) => {
if (err) {
return done(err);
}
assert.ok(result);

runCallbackTest(
parentSpan,
pgPoolattributes,
events,
unsetStatus,
2,
0
);
runCallbackTest(
parentSpan,
pgAttributes,
events,
unsetStatus,
2,
1
);
done();
});
assert.strictEqual(
resNoPromise,
undefined,
'No promise is returned'
);
});
});
});
});
});
});

0 comments on commit 3f8cdb0

Please sign in to comment.