diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts index 7b5dc98446d..3b245a3dbf1 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts @@ -39,6 +39,7 @@ export abstract class CollectorExporterNodeBase< DEFAULT_HEADERS: Record = {}; headers: Record; agent: http.Agent | https.Agent | undefined; + compress: boolean; constructor(config: CollectorExporterNodeConfigBase = {}) { super(config); @@ -51,6 +52,7 @@ export abstract class CollectorExporterNodeBase< baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS) ); this.agent = createHttpAgent(config); + this.compress = config.compress || false; } onInit(_config: CollectorExporterNodeConfigBase): void { @@ -86,6 +88,7 @@ export abstract class CollectorExporterNodeBase< this, JSON.stringify(serviceRequest), 'application/json', + this.compress, _onSuccess, _onError ); diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/types.ts b/packages/opentelemetry-exporter-collector/src/platform/node/types.ts index cee0f477a58..df42ea72c5b 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/types.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/types.ts @@ -24,5 +24,6 @@ import { CollectorExporterConfigBase } from '../../types'; export interface CollectorExporterNodeConfigBase extends CollectorExporterConfigBase { keepAlive?: boolean; + compress?: boolean; httpAgentOptions?: http.AgentOptions | https.AgentOptions; } diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts index fb303dd2170..2773824a498 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts @@ -16,11 +16,15 @@ import * as url from 'url'; import * as http from 'http'; import * as https from 'https'; +import * as zlib from 'zlib'; +import { pipeline, Readable } from 'stream'; import * as collectorTypes from '../../types'; import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; import { CollectorExporterNodeConfigBase } from '.'; import { diag } from '@opentelemetry/api'; +const gzip = zlib.createGzip(); + /** * Sends data using http * @param collector @@ -33,6 +37,7 @@ export function sendWithHttp( collector: CollectorExporterNodeBase, data: string | Buffer, contentType: string, + compress: boolean, onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ): void { @@ -46,6 +51,7 @@ export function sendWithHttp( headers: { 'Content-Length': Buffer.byteLength(data), 'Content-Type': contentType, + 'Content-Encoding': 'gzip', ...collector.headers, }, agent: collector.agent, @@ -71,13 +77,32 @@ export function sendWithHttp( }); }); + req.on('error', (error: Error) => { onError(error); }); - req.write(data); - req.end(); + + if (compress) { + const dataStream = Readable.from(data); + pipeline(dataStream, gzip, req, onGzipError(onError)); + } else { + req.write(data); + req.end(); + } } +function onGzipError(onError: (error: collectorTypes.CollectorExporterError) => void) { + return (err: NodeJS.ErrnoException | null) => { + const error = new collectorTypes.CollectorExporterError( + err?.message, + 500, + 'Compressing the request body for collector exporter failed.' + ); + onError(error) + } +} + + export function createHttpAgent( config: CollectorExporterNodeConfigBase ): http.Agent | https.Agent | undefined { diff --git a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts index d64ef15db11..72bc1591aab 100644 --- a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts @@ -20,6 +20,8 @@ import { ReadableSpan } from '@opentelemetry/tracing'; import * as http from 'http'; import * as assert from 'assert'; import * as sinon from 'sinon'; +import { PassThrough, Stream } from 'stream'; +import * as zlib from 'zlib'; import { CollectorTraceExporter, CollectorExporterNodeConfigBase, @@ -33,11 +35,7 @@ import { mockedReadableSpan, } from '../helper'; -const fakeRequest = { - end: function () {}, - on: function () {}, - write: function () {}, -}; +let fakeRequest: PassThrough; const address = 'localhost:1501'; @@ -45,10 +43,10 @@ describe('CollectorTraceExporter - node with json over http', () => { let collectorExporter: CollectorTraceExporter; let collectorExporterConfig: CollectorExporterNodeConfigBase; let stubRequest: sinon.SinonStub; - let stubWrite: sinon.SinonStub; let spans: ReadableSpan[]; afterEach(() => { + fakeRequest = new Stream.PassThrough(); sinon.restore(); }); @@ -109,7 +107,6 @@ describe('CollectorTraceExporter - node with json over http', () => { describe('export', () => { beforeEach(() => { stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any); - stubWrite = sinon.stub(fakeRequest, 'write'); collectorExporterConfig = { headers: { foo: 'bar', @@ -126,7 +123,7 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should open the connection', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); setTimeout(() => { const args = stubRequest.args[0]; @@ -140,7 +137,7 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should set custom headers', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); setTimeout(() => { const args = stubRequest.args[0]; @@ -151,7 +148,7 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should have keep alive and keepAliveMsecs option set', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); setTimeout(() => { const args = stubRequest.args[0]; @@ -164,8 +161,8 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('different http export requests should use the same agent', done => { - collectorExporter.export(spans, () => {}); - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); + collectorExporter.export(spans, () => { }); setTimeout(() => { const [firstExportAgent, secondExportAgent] = stubRequest.args.map( @@ -177,12 +174,14 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should successfully send the spans', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); + let buff = Buffer.from(''); + + fakeRequest.on('end', () => { + const responseBody = buff.toString(); - setTimeout(() => { - const writeArgs = stubWrite.args[0]; const json = JSON.parse( - writeArgs[0] + responseBody ) as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; const span1 = json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; @@ -195,6 +194,10 @@ describe('CollectorTraceExporter - node with json over http', () => { done(); }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); }); it('should log the successful message', done => { @@ -242,6 +245,55 @@ describe('CollectorTraceExporter - node with json over http', () => { }); }); }); + + describe('export - with compression', () => { + beforeEach(() => { + stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any); + collectorExporterConfig = { + headers: { + foo: 'bar', + }, + hostname: 'foo', + attributes: {}, + url: 'http://foo.bar.com', + keepAlive: true, + compress: true, + httpAgentOptions: { keepAliveMsecs: 2000 }, + }; + collectorExporter = new CollectorTraceExporter(collectorExporterConfig); + spans = []; + spans.push(Object.assign({}, mockedReadableSpan)); + }); + + it('should successfully send the spans', done => { + collectorExporter.export(spans, () => { }); + let buff = Buffer.from(''); + + fakeRequest.on('end', () => { + const responseBody = zlib.gunzipSync(buff).toString(); + + const json = JSON.parse( + responseBody + ) as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; + const span1 = + json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; + assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); + if (span1) { + ensureSpanIsCorrect(span1); + } + + ensureExportTraceServiceRequestIsSet(json); + + done(); + }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); + }); + + }); + describe('CollectorTraceExporter - node (getDefaultUrl)', () => { it('should default to localhost', done => { const collectorExporter = new CollectorTraceExporter();