From c358a65e5df0ac426a1b62591ee10de4e163e80f Mon Sep 17 00:00:00 2001 From: Ali Sabzevari Date: Thu, 8 Jul 2021 14:13:06 +0200 Subject: [PATCH 1/7] feat(exporter-collector): support gzip compression for node exporter collector --- .../node/CollectorExporterNodeBase.ts | 3 + .../src/platform/node/types.ts | 1 + .../src/platform/node/util.ts | 29 ++++++- .../test/node/CollectorTraceExporter.test.ts | 84 +++++++++++++++---- 4 files changed, 99 insertions(+), 18 deletions(-) diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts index 7b5dc98446..3b245a3dbf 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts @@ -39,6 +39,7 @@ export abstract class CollectorExporterNodeBase< DEFAULT_HEADERS: Record = {}; headers: Record; agent: http.Agent | https.Agent | undefined; + compress: boolean; constructor(config: CollectorExporterNodeConfigBase = {}) { super(config); @@ -51,6 +52,7 @@ export abstract class CollectorExporterNodeBase< baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS) ); this.agent = createHttpAgent(config); + this.compress = config.compress || false; } onInit(_config: CollectorExporterNodeConfigBase): void { @@ -86,6 +88,7 @@ export abstract class CollectorExporterNodeBase< this, JSON.stringify(serviceRequest), 'application/json', + this.compress, _onSuccess, _onError ); diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/types.ts b/packages/opentelemetry-exporter-collector/src/platform/node/types.ts index cee0f477a5..df42ea72c5 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/types.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/types.ts @@ -24,5 +24,6 @@ import { CollectorExporterConfigBase } from '../../types'; export interface CollectorExporterNodeConfigBase extends CollectorExporterConfigBase { keepAlive?: boolean; + compress?: boolean; httpAgentOptions?: http.AgentOptions | https.AgentOptions; } diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts index fb303dd217..2773824a49 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts @@ -16,11 +16,15 @@ import * as url from 'url'; import * as http from 'http'; import * as https from 'https'; +import * as zlib from 'zlib'; +import { pipeline, Readable } from 'stream'; import * as collectorTypes from '../../types'; import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; import { CollectorExporterNodeConfigBase } from '.'; import { diag } from '@opentelemetry/api'; +const gzip = zlib.createGzip(); + /** * Sends data using http * @param collector @@ -33,6 +37,7 @@ export function sendWithHttp( collector: CollectorExporterNodeBase, data: string | Buffer, contentType: string, + compress: boolean, onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ): void { @@ -46,6 +51,7 @@ export function sendWithHttp( headers: { 'Content-Length': Buffer.byteLength(data), 'Content-Type': contentType, + 'Content-Encoding': 'gzip', ...collector.headers, }, agent: collector.agent, @@ -71,13 +77,32 @@ export function sendWithHttp( }); }); + req.on('error', (error: Error) => { onError(error); }); - req.write(data); - req.end(); + + if (compress) { + const dataStream = Readable.from(data); + pipeline(dataStream, gzip, req, onGzipError(onError)); + } else { + req.write(data); + req.end(); + } } +function onGzipError(onError: (error: collectorTypes.CollectorExporterError) => void) { + return (err: NodeJS.ErrnoException | null) => { + const error = new collectorTypes.CollectorExporterError( + err?.message, + 500, + 'Compressing the request body for collector exporter failed.' + ); + onError(error) + } +} + + export function createHttpAgent( config: CollectorExporterNodeConfigBase ): http.Agent | https.Agent | undefined { diff --git a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts index d64ef15db1..72bc1591aa 100644 --- a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts @@ -20,6 +20,8 @@ import { ReadableSpan } from '@opentelemetry/tracing'; import * as http from 'http'; import * as assert from 'assert'; import * as sinon from 'sinon'; +import { PassThrough, Stream } from 'stream'; +import * as zlib from 'zlib'; import { CollectorTraceExporter, CollectorExporterNodeConfigBase, @@ -33,11 +35,7 @@ import { mockedReadableSpan, } from '../helper'; -const fakeRequest = { - end: function () {}, - on: function () {}, - write: function () {}, -}; +let fakeRequest: PassThrough; const address = 'localhost:1501'; @@ -45,10 +43,10 @@ describe('CollectorTraceExporter - node with json over http', () => { let collectorExporter: CollectorTraceExporter; let collectorExporterConfig: CollectorExporterNodeConfigBase; let stubRequest: sinon.SinonStub; - let stubWrite: sinon.SinonStub; let spans: ReadableSpan[]; afterEach(() => { + fakeRequest = new Stream.PassThrough(); sinon.restore(); }); @@ -109,7 +107,6 @@ describe('CollectorTraceExporter - node with json over http', () => { describe('export', () => { beforeEach(() => { stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any); - stubWrite = sinon.stub(fakeRequest, 'write'); collectorExporterConfig = { headers: { foo: 'bar', @@ -126,7 +123,7 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should open the connection', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); setTimeout(() => { const args = stubRequest.args[0]; @@ -140,7 +137,7 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should set custom headers', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); setTimeout(() => { const args = stubRequest.args[0]; @@ -151,7 +148,7 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should have keep alive and keepAliveMsecs option set', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); setTimeout(() => { const args = stubRequest.args[0]; @@ -164,8 +161,8 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('different http export requests should use the same agent', done => { - collectorExporter.export(spans, () => {}); - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); + collectorExporter.export(spans, () => { }); setTimeout(() => { const [firstExportAgent, secondExportAgent] = stubRequest.args.map( @@ -177,12 +174,14 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should successfully send the spans', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); + let buff = Buffer.from(''); + + fakeRequest.on('end', () => { + const responseBody = buff.toString(); - setTimeout(() => { - const writeArgs = stubWrite.args[0]; const json = JSON.parse( - writeArgs[0] + responseBody ) as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; const span1 = json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; @@ -195,6 +194,10 @@ describe('CollectorTraceExporter - node with json over http', () => { done(); }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); }); it('should log the successful message', done => { @@ -242,6 +245,55 @@ describe('CollectorTraceExporter - node with json over http', () => { }); }); }); + + describe('export - with compression', () => { + beforeEach(() => { + stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any); + collectorExporterConfig = { + headers: { + foo: 'bar', + }, + hostname: 'foo', + attributes: {}, + url: 'http://foo.bar.com', + keepAlive: true, + compress: true, + httpAgentOptions: { keepAliveMsecs: 2000 }, + }; + collectorExporter = new CollectorTraceExporter(collectorExporterConfig); + spans = []; + spans.push(Object.assign({}, mockedReadableSpan)); + }); + + it('should successfully send the spans', done => { + collectorExporter.export(spans, () => { }); + let buff = Buffer.from(''); + + fakeRequest.on('end', () => { + const responseBody = zlib.gunzipSync(buff).toString(); + + const json = JSON.parse( + responseBody + ) as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; + const span1 = + json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; + assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); + if (span1) { + ensureSpanIsCorrect(span1); + } + + ensureExportTraceServiceRequestIsSet(json); + + done(); + }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); + }); + + }); + describe('CollectorTraceExporter - node (getDefaultUrl)', () => { it('should default to localhost', done => { const collectorExporter = new CollectorTraceExporter(); From 62a34eec099cee81caf696b9bb9c13dd9b84407f Mon Sep 17 00:00:00 2001 From: Ali Sabzevari Date: Thu, 8 Jul 2021 18:14:33 +0200 Subject: [PATCH 2/7] feat(exporter-collector-proto): support gzip compression --- .../src/CollectorExporterNodeBase.ts | 14 ++- .../src/util.ts | 2 + .../test/CollectorTraceExporter.test.ts | 110 +++++++++++++----- .../test/node/CollectorTraceExporter.test.ts | 3 +- 4 files changed, 99 insertions(+), 30 deletions(-) diff --git a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts index 7fbb4f6057..ed68464f26 100644 --- a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts @@ -22,6 +22,12 @@ import { } from '@opentelemetry/exporter-collector'; import { ServiceClientType } from './types'; +type SendFn = (collector: CollectorExporterNodeBase, + objects: ExportItem[], + compress: boolean, + onSuccess: () => void, + onError: (error: collectorTypes.CollectorExporterError) => void) => void; + /** * Collector Metric Exporter abstract base class */ @@ -29,7 +35,11 @@ export abstract class CollectorExporterNodeBase< ExportItem, ServiceRequest > extends CollectorExporterBaseMain { - private _send!: Function; + private _send!: SendFn; + + constructor(config: CollectorExporterNodeConfigBase = {}) { + super(config) + } private _sendPromise( objects: ExportItem[], @@ -51,7 +61,7 @@ export abstract class CollectorExporterNodeBase< this._sendingPromises.splice(index, 1); }; - this._send(this, objects, _onSuccess, _onError); + this._send(this, objects, this.compress, _onSuccess, _onError); }); this._sendingPromises.push(promise); diff --git a/packages/opentelemetry-exporter-collector-proto/src/util.ts b/packages/opentelemetry-exporter-collector-proto/src/util.ts index f944d91512..248bbba238 100644 --- a/packages/opentelemetry-exporter-collector-proto/src/util.ts +++ b/packages/opentelemetry-exporter-collector-proto/src/util.ts @@ -63,6 +63,7 @@ export function onInit( export function send( collector: CollectorExporterNodeBase, objects: ExportItem[], + compress: boolean, onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ): void { @@ -76,6 +77,7 @@ export function send( collector, Buffer.from(body), 'application/x-protobuf', + compress, onSuccess, onError ); diff --git a/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts index d33e8563b1..a8c83aac24 100644 --- a/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts @@ -24,6 +24,8 @@ import { ReadableSpan } from '@opentelemetry/tracing'; import * as assert from 'assert'; import * as http from 'http'; import * as sinon from 'sinon'; +import { Stream } from 'stream'; +import * as zlib from 'zlib'; import { CollectorTraceExporter } from '../src'; import { getExportRequestProto } from '../src/util'; import { @@ -34,9 +36,9 @@ import { } from './helper'; const fakeRequest = { - end: function () {}, - on: function () {}, - write: function () {}, + end: function () { }, + on: function () { }, + write: function () { }, }; describe('CollectorTraceExporter - node with proto over http', () => { @@ -104,7 +106,7 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); it('should open the connection', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); sinon.stub(http, 'request').callsFake((options: any) => { assert.strictEqual(options.hostname, 'foo.bar.com'); @@ -116,7 +118,7 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); it('should set custom headers', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); sinon.stub(http, 'request').callsFake((options: any) => { assert.strictEqual(options.headers['foo'], 'bar'); @@ -126,7 +128,7 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); it('should have keep alive and keepAliveMsecs option set', done => { - collectorExporter.export(spans, () => {}); + collectorExporter.export(spans, () => { }); sinon.stub(http, 'request').callsFake((options: any) => { assert.strictEqual(options.agent.keepAlive, true); @@ -137,27 +139,31 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); it('should successfully send the spans', done => { - collectorExporter.export(spans, () => {}); - - sinon.stub(http, 'request').returns({ - end: () => {}, - on: () => {}, - write: (...args: any[]) => { - const ExportTraceServiceRequestProto = getExportRequestProto(); - const data = ExportTraceServiceRequestProto?.decode(args[0]); - const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; - const span1 = - json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; - assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); - if (span1) { - ensureProtoSpanIsCorrect(span1); - } - - ensureExportTraceServiceRequestIsSet(json); - - done(); - }, - } as any); + const fakeRequest = new Stream.PassThrough(); + sinon.stub(http, 'request').returns(fakeRequest as any); + + let buff = Buffer.from(''); + fakeRequest.on('end', () => { + const ExportTraceServiceRequestProto = getExportRequestProto(); + const data = ExportTraceServiceRequestProto?.decode(buff); + const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; + const span1 = + json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; + assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); + if (span1) { + ensureProtoSpanIsCorrect(span1); + } + + ensureExportTraceServiceRequestIsSet(json); + + done(); + }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); + + collectorExporter.export(spans, () => { }); }); it('should log the successful message', done => { @@ -195,4 +201,54 @@ describe('CollectorTraceExporter - node with proto over http', () => { }); }); }); + describe('export - with compression', () => { + beforeEach(() => { + collectorExporterConfig = { + headers: { + foo: 'bar', + }, + hostname: 'foo', + attributes: {}, + url: 'http://foo.bar.com', + keepAlive: true, + compress: true, + httpAgentOptions: { keepAliveMsecs: 2000 }, + }; + collectorExporter = new CollectorTraceExporter(collectorExporterConfig); + spans = []; + spans.push(Object.assign({}, mockedReadableSpan)); + }); + afterEach(() => { + sinon.restore(); + }); + + it('should successfully send the spans', done => { + const fakeRequest = new Stream.PassThrough(); + sinon.stub(http, 'request').returns(fakeRequest as any); + + let buff = Buffer.from(''); + fakeRequest.on('end', () => { + const unzippedBuff = zlib.gunzipSync(buff); + const ExportTraceServiceRequestProto = getExportRequestProto(); + const data = ExportTraceServiceRequestProto?.decode(unzippedBuff); + const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest; + const span1 = + json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0]; + assert.ok(typeof span1 !== 'undefined', "span doesn't exist"); + if (span1) { + ensureProtoSpanIsCorrect(span1); + } + + ensureExportTraceServiceRequestIsSet(json); + + done(); + }); + + fakeRequest.on('data', chunk => { + buff = Buffer.concat([buff, chunk]); + }); + + collectorExporter.export(spans, () => { }); + }); + }); }); diff --git a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts index 72bc1591aa..43b17d426d 100644 --- a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts @@ -174,7 +174,6 @@ describe('CollectorTraceExporter - node with json over http', () => { }); it('should successfully send the spans', done => { - collectorExporter.export(spans, () => { }); let buff = Buffer.from(''); fakeRequest.on('end', () => { @@ -198,6 +197,8 @@ describe('CollectorTraceExporter - node with json over http', () => { fakeRequest.on('data', chunk => { buff = Buffer.concat([buff, chunk]); }); + + collectorExporter.export(spans, () => { }); }); it('should log the successful message', done => { From ec88f76a2924e32c4c081567830a58e3be3918a2 Mon Sep 17 00:00:00 2001 From: Ali Sabzevari Date: Thu, 8 Jul 2021 18:43:49 +0200 Subject: [PATCH 3/7] fix(exporter-collector): node8 compatibility fix for exporter gzip compression --- .../src/platform/node/util.ts | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts index 2773824a49..4070f6abd6 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts @@ -17,7 +17,7 @@ import * as url from 'url'; import * as http from 'http'; import * as https from 'https'; import * as zlib from 'zlib'; -import { pipeline, Readable } from 'stream'; +import { Readable } from 'stream'; import * as collectorTypes from '../../types'; import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; import { CollectorExporterNodeConfigBase } from '.'; @@ -83,26 +83,24 @@ export function sendWithHttp( }); if (compress) { - const dataStream = Readable.from(data); - pipeline(dataStream, gzip, req, onGzipError(onError)); + const dataStream = readableFromBuffer(data); + dataStream.on('error', onError) + .pipe(gzip).on('error', onError) + .pipe(req); } else { req.write(data); req.end(); } } -function onGzipError(onError: (error: collectorTypes.CollectorExporterError) => void) { - return (err: NodeJS.ErrnoException | null) => { - const error = new collectorTypes.CollectorExporterError( - err?.message, - 500, - 'Compressing the request body for collector exporter failed.' - ); - onError(error) - } +function readableFromBuffer(buff: string | Buffer): Readable { + const readable = new Readable(); + readable.push(buff); + readable.push(null); + + return readable; } - export function createHttpAgent( config: CollectorExporterNodeConfigBase ): http.Agent | https.Agent | undefined { From 446c71bc92ca3e9894d6222eb097c632ae20ba74 Mon Sep 17 00:00:00 2001 From: Ali Sabzevari Date: Mon, 12 Jul 2021 11:58:46 +0200 Subject: [PATCH 4/7] feat(exporter-collector): change compression config to enum --- .../src/CollectorExporterNodeBase.ts | 5 ++-- .../src/util.ts | 5 ++-- .../test/CollectorTraceExporter.test.ts | 3 ++- .../node/CollectorExporterNodeBase.ts | 8 +++--- .../src/platform/node/types.ts | 7 ++++- .../src/platform/node/util.ts | 27 ++++++++++++------- .../test/node/CollectorTraceExporter.test.ts | 3 ++- 7 files changed, 37 insertions(+), 21 deletions(-) diff --git a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts index ed68464f26..3f2e4c53bc 100644 --- a/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector-proto/src/CollectorExporterNodeBase.ts @@ -19,12 +19,13 @@ import { CollectorExporterNodeBase as CollectorExporterBaseMain, collectorTypes, CollectorExporterNodeConfigBase, + CompressionAlgorithm, } from '@opentelemetry/exporter-collector'; import { ServiceClientType } from './types'; type SendFn = (collector: CollectorExporterNodeBase, objects: ExportItem[], - compress: boolean, + compression: CompressionAlgorithm, onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void) => void; @@ -61,7 +62,7 @@ export abstract class CollectorExporterNodeBase< this._sendingPromises.splice(index, 1); }; - this._send(this, objects, this.compress, _onSuccess, _onError); + this._send(this, objects, this.compression, _onSuccess, _onError); }); this._sendingPromises.push(promise); diff --git a/packages/opentelemetry-exporter-collector-proto/src/util.ts b/packages/opentelemetry-exporter-collector-proto/src/util.ts index 248bbba238..73e3caf518 100644 --- a/packages/opentelemetry-exporter-collector-proto/src/util.ts +++ b/packages/opentelemetry-exporter-collector-proto/src/util.ts @@ -18,6 +18,7 @@ import { collectorTypes, sendWithHttp, CollectorExporterNodeConfigBase, + CompressionAlgorithm, } from '@opentelemetry/exporter-collector'; import * as path from 'path'; @@ -63,7 +64,7 @@ export function onInit( export function send( collector: CollectorExporterNodeBase, objects: ExportItem[], - compress: boolean, + compression: CompressionAlgorithm, onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ): void { @@ -77,7 +78,7 @@ export function send( collector, Buffer.from(body), 'application/x-protobuf', - compress, + compression, onSuccess, onError ); diff --git a/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts index a8c83aac24..44bc9d25c7 100644 --- a/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts @@ -19,6 +19,7 @@ import { ExportResultCode } from '@opentelemetry/core'; import { CollectorExporterNodeConfigBase, collectorTypes, + CompressionAlgorithm, } from '@opentelemetry/exporter-collector'; import { ReadableSpan } from '@opentelemetry/tracing'; import * as assert from 'assert'; @@ -211,7 +212,7 @@ describe('CollectorTraceExporter - node with proto over http', () => { attributes: {}, url: 'http://foo.bar.com', keepAlive: true, - compress: true, + compression: CompressionAlgorithm.GZIP, httpAgentOptions: { keepAliveMsecs: 2000 }, }; collectorExporter = new CollectorTraceExporter(collectorExporterConfig); diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts index 3b245a3dbf..178e82c782 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts @@ -18,7 +18,7 @@ import type * as http from 'http'; import type * as https from 'https'; import { CollectorExporterBase } from '../../CollectorExporterBase'; -import { CollectorExporterNodeConfigBase } from './types'; +import { CollectorExporterNodeConfigBase, CompressionAlgorithm } from './types'; import * as collectorTypes from '../../types'; import { parseHeaders } from '../../util'; import { createHttpAgent, sendWithHttp } from './util'; @@ -39,7 +39,7 @@ export abstract class CollectorExporterNodeBase< DEFAULT_HEADERS: Record = {}; headers: Record; agent: http.Agent | https.Agent | undefined; - compress: boolean; + compression: CompressionAlgorithm; constructor(config: CollectorExporterNodeConfigBase = {}) { super(config); @@ -52,7 +52,7 @@ export abstract class CollectorExporterNodeBase< baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS) ); this.agent = createHttpAgent(config); - this.compress = config.compress || false; + this.compression = config.compression || CompressionAlgorithm.NONE; } onInit(_config: CollectorExporterNodeConfigBase): void { @@ -88,7 +88,7 @@ export abstract class CollectorExporterNodeBase< this, JSON.stringify(serviceRequest), 'application/json', - this.compress, + this.compression, _onSuccess, _onError ); diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/types.ts b/packages/opentelemetry-exporter-collector/src/platform/node/types.ts index df42ea72c5..4ee5167e79 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/types.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/types.ts @@ -24,6 +24,11 @@ import { CollectorExporterConfigBase } from '../../types'; export interface CollectorExporterNodeConfigBase extends CollectorExporterConfigBase { keepAlive?: boolean; - compress?: boolean; + compression?: CompressionAlgorithm; httpAgentOptions?: http.AgentOptions | https.AgentOptions; } + +export enum CompressionAlgorithm { + NONE = 'none', + GZIP = 'gzip' +} diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts index 4070f6abd6..3318908b83 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts @@ -22,6 +22,7 @@ import * as collectorTypes from '../../types'; import { CollectorExporterNodeBase } from './CollectorExporterNodeBase'; import { CollectorExporterNodeConfigBase } from '.'; import { diag } from '@opentelemetry/api'; +import { CompressionAlgorithm } from './types'; const gzip = zlib.createGzip(); @@ -37,7 +38,7 @@ export function sendWithHttp( collector: CollectorExporterNodeBase, data: string | Buffer, contentType: string, - compress: boolean, + compression: CompressionAlgorithm, onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ): void { @@ -82,14 +83,20 @@ export function sendWithHttp( onError(error); }); - if (compress) { - const dataStream = readableFromBuffer(data); - dataStream.on('error', onError) - .pipe(gzip).on('error', onError) - .pipe(req); - } else { - req.write(data); - req.end(); + switch (compression) { + case CompressionAlgorithm.GZIP: { + const dataStream = readableFromBuffer(data); + dataStream.on('error', onError) + .pipe(gzip).on('error', onError) + .pipe(req); + + break; + } + default: + req.write(data); + req.end(); + + break; } } @@ -97,7 +104,7 @@ function readableFromBuffer(buff: string | Buffer): Readable { const readable = new Readable(); readable.push(buff); readable.push(null); - + return readable; } diff --git a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts index 43b17d426d..4b55620ec6 100644 --- a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts @@ -25,6 +25,7 @@ import * as zlib from 'zlib'; import { CollectorTraceExporter, CollectorExporterNodeConfigBase, + CompressionAlgorithm, } from '../../src/platform/node'; import * as collectorTypes from '../../src/types'; import { MockedResponse } from './nodeHelpers'; @@ -258,7 +259,7 @@ describe('CollectorTraceExporter - node with json over http', () => { attributes: {}, url: 'http://foo.bar.com', keepAlive: true, - compress: true, + compression: CompressionAlgorithm.GZIP, httpAgentOptions: { keepAliveMsecs: 2000 }, }; collectorExporter = new CollectorTraceExporter(collectorExporterConfig); From 6bceae04a50f3626a6273a6660090f231cfff984 Mon Sep 17 00:00:00 2001 From: Ali Sabzevari Date: Mon, 12 Jul 2021 20:58:51 +0200 Subject: [PATCH 5/7] refactor(exporter-collector): use compression config from collector instance in sendWithHttp --- packages/opentelemetry-exporter-collector-proto/src/util.ts | 1 - .../src/platform/node/CollectorExporterNodeBase.ts | 1 - .../opentelemetry-exporter-collector/src/platform/node/util.ts | 3 +-- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/opentelemetry-exporter-collector-proto/src/util.ts b/packages/opentelemetry-exporter-collector-proto/src/util.ts index 73e3caf518..e9e9ea8acc 100644 --- a/packages/opentelemetry-exporter-collector-proto/src/util.ts +++ b/packages/opentelemetry-exporter-collector-proto/src/util.ts @@ -78,7 +78,6 @@ export function send( collector, Buffer.from(body), 'application/x-protobuf', - compression, onSuccess, onError ); diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts index 178e82c782..10a5bdca19 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/CollectorExporterNodeBase.ts @@ -88,7 +88,6 @@ export abstract class CollectorExporterNodeBase< this, JSON.stringify(serviceRequest), 'application/json', - this.compression, _onSuccess, _onError ); diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts index 3318908b83..482ac75f99 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts @@ -38,7 +38,6 @@ export function sendWithHttp( collector: CollectorExporterNodeBase, data: string | Buffer, contentType: string, - compression: CompressionAlgorithm, onSuccess: () => void, onError: (error: collectorTypes.CollectorExporterError) => void ): void { @@ -83,7 +82,7 @@ export function sendWithHttp( onError(error); }); - switch (compression) { + switch (collector.compression) { case CompressionAlgorithm.GZIP: { const dataStream = readableFromBuffer(data); dataStream.on('error', onError) From 8805cea20f4081a4af03e3fc74522efc019e3f89 Mon Sep 17 00:00:00 2001 From: Ali Sabzevari Date: Mon, 12 Jul 2021 21:48:39 +0200 Subject: [PATCH 6/7] fix(exporter-collector): set content-encoding header only when compressing --- .../src/platform/node/util.ts | 2 +- .../test/node/CollectorTraceExporter.test.ts | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts index 482ac75f99..c7848014af 100644 --- a/packages/opentelemetry-exporter-collector/src/platform/node/util.ts +++ b/packages/opentelemetry-exporter-collector/src/platform/node/util.ts @@ -51,7 +51,6 @@ export function sendWithHttp( headers: { 'Content-Length': Buffer.byteLength(data), 'Content-Type': contentType, - 'Content-Encoding': 'gzip', ...collector.headers, }, agent: collector.agent, @@ -84,6 +83,7 @@ export function sendWithHttp( switch (collector.compression) { case CompressionAlgorithm.GZIP: { + req.setHeader('Content-Encoding', 'gzip'); const dataStream = readableFromBuffer(data); dataStream.on('error', onError) .pipe(gzip).on('error', onError) diff --git a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts index 4b55620ec6..f8215d6494 100644 --- a/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/node/CollectorTraceExporter.test.ts @@ -44,6 +44,7 @@ describe('CollectorTraceExporter - node with json over http', () => { let collectorExporter: CollectorTraceExporter; let collectorExporterConfig: CollectorExporterNodeConfigBase; let stubRequest: sinon.SinonStub; + let spySetHeader: sinon.SinonSpy; let spans: ReadableSpan[]; afterEach(() => { @@ -148,6 +149,17 @@ describe('CollectorTraceExporter - node with json over http', () => { }); }); + it('should not have Content-Encoding header', done => { + collectorExporter.export(spans, () => { }); + + setTimeout(() => { + const args = stubRequest.args[0]; + const options = args[0]; + assert.strictEqual(options.headers['Content-Encoding'], undefined); + done(); + }); + }); + it('should have keep alive and keepAliveMsecs option set', done => { collectorExporter.export(spans, () => { }); @@ -251,6 +263,8 @@ describe('CollectorTraceExporter - node with json over http', () => { describe('export - with compression', () => { beforeEach(() => { stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any); + spySetHeader = sinon.spy(); + (fakeRequest as any).setHeader = spySetHeader; collectorExporterConfig = { headers: { foo: 'bar', @@ -285,6 +299,7 @@ describe('CollectorTraceExporter - node with json over http', () => { } ensureExportTraceServiceRequestIsSet(json); + assert.ok(spySetHeader.calledWith('Content-Encoding', 'gzip')); done(); }); From 020f775d42f1912809d4e6776503be0493bda72e Mon Sep 17 00:00:00 2001 From: Ali Sabzevari Date: Tue, 13 Jul 2021 19:07:51 +0200 Subject: [PATCH 7/7] fix(exporter-collector-proto): fix test for collector when compression enabled --- .../test/CollectorTraceExporter.test.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts index 44bc9d25c7..3a8d2a6758 100644 --- a/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector-proto/test/CollectorTraceExporter.test.ts @@ -226,6 +226,8 @@ describe('CollectorTraceExporter - node with proto over http', () => { it('should successfully send the spans', done => { const fakeRequest = new Stream.PassThrough(); sinon.stub(http, 'request').returns(fakeRequest as any); + const spySetHeader = sinon.spy(); + (fakeRequest as any).setHeader = spySetHeader; let buff = Buffer.from(''); fakeRequest.on('end', () => { @@ -241,6 +243,7 @@ describe('CollectorTraceExporter - node with proto over http', () => { } ensureExportTraceServiceRequestIsSet(json); + assert.ok(spySetHeader.calledWith('Content-Encoding', 'gzip')); done(); });