Skip to content

Commit

Permalink
feat(exporter-collector): support gzip compression for node exporter …
Browse files Browse the repository at this point in the history
…collector
  • Loading branch information
alisabzevari committed Jul 8, 2021
1 parent 3a327dd commit a10c50e
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 18 deletions.
Expand Up @@ -39,6 +39,7 @@ export abstract class CollectorExporterNodeBase<
DEFAULT_HEADERS: Record<string, string> = {};
headers: Record<string, string>;
agent: http.Agent | https.Agent | undefined;
compress: boolean;

constructor(config: CollectorExporterNodeConfigBase = {}) {
super(config);
Expand All @@ -51,6 +52,7 @@ export abstract class CollectorExporterNodeBase<
baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS)
);
this.agent = createHttpAgent(config);
this.compress = config.compress || false;
}

onInit(_config: CollectorExporterNodeConfigBase): void {
Expand Down Expand Up @@ -86,6 +88,7 @@ export abstract class CollectorExporterNodeBase<
this,
JSON.stringify(serviceRequest),
'application/json',
this.compress,
_onSuccess,
_onError
);
Expand Down
Expand Up @@ -24,5 +24,6 @@ import { CollectorExporterConfigBase } from '../../types';
export interface CollectorExporterNodeConfigBase
extends CollectorExporterConfigBase {
keepAlive?: boolean;
compress?: boolean;
httpAgentOptions?: http.AgentOptions | https.AgentOptions;
}
Expand Up @@ -16,11 +16,15 @@
import * as url from 'url';
import * as http from 'http';
import * as https from 'https';
import * as zlib from 'zlib';
import { pipeline, Readable } from 'stream';
import * as collectorTypes from '../../types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import { CollectorExporterNodeConfigBase } from '.';
import { diag } from '@opentelemetry/api';

const gzip = zlib.createGzip();

/**
* Sends data using http
* @param collector
Expand All @@ -33,6 +37,7 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
data: string | Buffer,
contentType: string,
compress: boolean,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
Expand All @@ -46,6 +51,7 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
headers: {
'Content-Length': Buffer.byteLength(data),
'Content-Type': contentType,
'Content-Encoding': 'gzip',
...collector.headers,
},
agent: collector.agent,
Expand All @@ -71,13 +77,32 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
});
});


req.on('error', (error: Error) => {
onError(error);
});
req.write(data);
req.end();

if (compress) {
const dataStream = Readable.from(data);
pipeline(dataStream, gzip, req, onGzipError(onError));
} else {
req.write(data);
req.end();
}
}

function onGzipError(onError: (error: collectorTypes.CollectorExporterError) => void) {
return (err: NodeJS.ErrnoException | null) => {
const error = new collectorTypes.CollectorExporterError(
err?.message,
500,
'Compressing the request body for collector exporter failed.'
);
onError(error)
}
}


export function createHttpAgent(
config: CollectorExporterNodeConfigBase
): http.Agent | https.Agent | undefined {
Expand Down
Expand Up @@ -20,6 +20,8 @@ import { ReadableSpan } from '@opentelemetry/tracing';
import * as http from 'http';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { PassThrough, Stream } from 'stream';
import * as zlib from 'zlib';
import {
CollectorTraceExporter,
CollectorExporterNodeConfigBase,
Expand All @@ -33,22 +35,18 @@ import {
mockedReadableSpan,
} from '../helper';

const fakeRequest = {
end: function () {},
on: function () {},
write: function () {},
};
let fakeRequest: PassThrough;

const address = 'localhost:1501';

describe('CollectorTraceExporter - node with json over http', () => {
let collectorExporter: CollectorTraceExporter;
let collectorExporterConfig: CollectorExporterNodeConfigBase;
let stubRequest: sinon.SinonStub;
let stubWrite: sinon.SinonStub;
let spans: ReadableSpan[];

afterEach(() => {
fakeRequest = new Stream.PassThrough();
sinon.restore();
});

Expand Down Expand Up @@ -109,7 +107,6 @@ describe('CollectorTraceExporter - node with json over http', () => {
describe('export', () => {
beforeEach(() => {
stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any);
stubWrite = sinon.stub(fakeRequest, 'write');
collectorExporterConfig = {
headers: {
foo: 'bar',
Expand All @@ -126,7 +123,7 @@ describe('CollectorTraceExporter - node with json over http', () => {
});

it('should open the connection', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

setTimeout(() => {
const args = stubRequest.args[0];
Expand All @@ -140,7 +137,7 @@ describe('CollectorTraceExporter - node with json over http', () => {
});

it('should set custom headers', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

setTimeout(() => {
const args = stubRequest.args[0];
Expand All @@ -151,7 +148,7 @@ describe('CollectorTraceExporter - node with json over http', () => {
});

it('should have keep alive and keepAliveMsecs option set', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

setTimeout(() => {
const args = stubRequest.args[0];
Expand All @@ -164,8 +161,8 @@ describe('CollectorTraceExporter - node with json over http', () => {
});

it('different http export requests should use the same agent', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });
collectorExporter.export(spans, () => { });

setTimeout(() => {
const [firstExportAgent, secondExportAgent] = stubRequest.args.map(
Expand All @@ -177,12 +174,14 @@ describe('CollectorTraceExporter - node with json over http', () => {
});

it('should successfully send the spans', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });
let buff = Buffer.from('');

fakeRequest.on('end', () => {
const responseBody = buff.toString();

setTimeout(() => {
const writeArgs = stubWrite.args[0];
const json = JSON.parse(
writeArgs[0]
responseBody
) as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
Expand All @@ -195,6 +194,10 @@ describe('CollectorTraceExporter - node with json over http', () => {

done();
});

fakeRequest.on('data', chunk => {
buff = Buffer.concat([buff, chunk]);
});
});

it('should log the successful message', done => {
Expand Down Expand Up @@ -242,6 +245,55 @@ describe('CollectorTraceExporter - node with json over http', () => {
});
});
});

describe('export - with compression', () => {
beforeEach(() => {
stubRequest = sinon.stub(http, 'request').returns(fakeRequest as any);
collectorExporterConfig = {
headers: {
foo: 'bar',
},
hostname: 'foo',
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
compress: true,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorTraceExporter(collectorExporterConfig);
spans = [];
spans.push(Object.assign({}, mockedReadableSpan));
});

it('should successfully send the spans', done => {
collectorExporter.export(spans, () => { });
let buff = Buffer.from('');

fakeRequest.on('end', () => {
const responseBody = zlib.gunzipSync(buff).toString();

const json = JSON.parse(
responseBody
) as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);

done();
});

fakeRequest.on('data', chunk => {
buff = Buffer.concat([buff, chunk]);
});
});

});

describe('CollectorTraceExporter - node (getDefaultUrl)', () => {
it('should default to localhost', done => {
const collectorExporter = new CollectorTraceExporter();
Expand Down

0 comments on commit a10c50e

Please sign in to comment.