Skip to content

Commit

Permalink
Support gzip compression for node exporter collector (#2337)
Browse files Browse the repository at this point in the history
Co-authored-by: Bartlomiej Obecny <bobecny@gmail.com>
Co-authored-by: Daniel Dyla <dyladan@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 27, 2021
1 parent d8fbedd commit ec88344
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 48 deletions.
Expand Up @@ -19,17 +19,28 @@ import {
CollectorExporterNodeBase as CollectorExporterBaseMain,
collectorTypes,
CollectorExporterNodeConfigBase,
CompressionAlgorithm,
} from '@opentelemetry/exporter-collector';
import { ServiceClientType } from './types';

type SendFn = <ExportItem, ServiceRequest>(collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
objects: ExportItem[],
compression: CompressionAlgorithm,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void) => void;

/**
* Collector Metric Exporter abstract base class
*/
export abstract class CollectorExporterNodeBase<
ExportItem,
ServiceRequest
> extends CollectorExporterBaseMain<ExportItem, ServiceRequest> {
private _send!: Function;
private _send!: SendFn;

constructor(config: CollectorExporterNodeConfigBase = {}) {
super(config)
}

private _sendPromise(
objects: ExportItem[],
Expand All @@ -51,7 +62,7 @@ export abstract class CollectorExporterNodeBase<
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, _onSuccess, _onError);
this._send(this, objects, this.compression, _onSuccess, _onError);
});

this._sendingPromises.push(promise);
Expand Down
2 changes: 2 additions & 0 deletions packages/opentelemetry-exporter-collector-proto/src/util.ts
Expand Up @@ -18,6 +18,7 @@ import {
collectorTypes,
sendWithHttp,
CollectorExporterNodeConfigBase,
CompressionAlgorithm,
} from '@opentelemetry/exporter-collector';
import * as path from 'path';

Expand Down Expand Up @@ -63,6 +64,7 @@ export function onInit<ExportItem, ServiceRequest>(
export function send<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
objects: ExportItem[],
compression: CompressionAlgorithm,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
Expand Down
Expand Up @@ -19,11 +19,14 @@ import { ExportResultCode } from '@opentelemetry/core';
import {
CollectorExporterNodeConfigBase,
collectorTypes,
CompressionAlgorithm,
} from '@opentelemetry/exporter-collector';
import { ReadableSpan } from '@opentelemetry/tracing';
import * as assert from 'assert';
import * as http from 'http';
import * as sinon from 'sinon';
import { Stream } from 'stream';
import * as zlib from 'zlib';
import { CollectorTraceExporter } from '../src';
import { getExportRequestProto } from '../src/util';
import {
Expand All @@ -34,9 +37,9 @@ import {
} from './helper';

const fakeRequest = {
end: function () {},
on: function () {},
write: function () {},
end: function () { },
on: function () { },
write: function () { },
};

describe('CollectorTraceExporter - node with proto over http', () => {
Expand Down Expand Up @@ -104,7 +107,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should open the connection', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

sinon.stub(http, 'request').callsFake((options: any) => {
assert.strictEqual(options.hostname, 'foo.bar.com');
Expand All @@ -116,7 +119,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should set custom headers', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

sinon.stub(http, 'request').callsFake((options: any) => {
assert.strictEqual(options.headers['foo'], 'bar');
Expand All @@ -126,7 +129,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should have keep alive and keepAliveMsecs option set', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

sinon.stub(http, 'request').callsFake((options: any) => {
assert.strictEqual(options.agent.keepAlive, true);
Expand All @@ -137,27 +140,31 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should successfully send the spans', done => {
collectorExporter.export(spans, () => {});

sinon.stub(http, 'request').returns({
end: () => {},
on: () => {},
write: (...args: any[]) => {
const ExportTraceServiceRequestProto = getExportRequestProto();
const data = ExportTraceServiceRequestProto?.decode(args[0]);
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureProtoSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);

done();
},
} as any);
const fakeRequest = new Stream.PassThrough();
sinon.stub(http, 'request').returns(fakeRequest as any);

let buff = Buffer.from('');
fakeRequest.on('end', () => {
const ExportTraceServiceRequestProto = getExportRequestProto();
const data = ExportTraceServiceRequestProto?.decode(buff);
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureProtoSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);

done();
});

fakeRequest.on('data', chunk => {
buff = Buffer.concat([buff, chunk]);
});

collectorExporter.export(spans, () => { });
});

it('should log the successful message', done => {
Expand Down Expand Up @@ -195,4 +202,57 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});
});
});
describe('export - with compression', () => {
beforeEach(() => {
collectorExporterConfig = {
headers: {
foo: 'bar',
},
hostname: 'foo',
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
compression: CompressionAlgorithm.GZIP,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorTraceExporter(collectorExporterConfig);
spans = [];
spans.push(Object.assign({}, mockedReadableSpan));
});
afterEach(() => {
sinon.restore();
});

it('should successfully send the spans', done => {
const fakeRequest = new Stream.PassThrough();
sinon.stub(http, 'request').returns(fakeRequest as any);
const spySetHeader = sinon.spy();
(fakeRequest as any).setHeader = spySetHeader;

let buff = Buffer.from('');
fakeRequest.on('end', () => {
const unzippedBuff = zlib.gunzipSync(buff);
const ExportTraceServiceRequestProto = getExportRequestProto();
const data = ExportTraceServiceRequestProto?.decode(unzippedBuff);
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureProtoSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);
assert.ok(spySetHeader.calledWith('Content-Encoding', 'gzip'));

done();
});

fakeRequest.on('data', chunk => {
buff = Buffer.concat([buff, chunk]);
});

collectorExporter.export(spans, () => { });
});
});
});
Expand Up @@ -18,7 +18,7 @@ import type * as http from 'http';
import type * as https from 'https';

import { CollectorExporterBase } from '../../CollectorExporterBase';
import { CollectorExporterNodeConfigBase } from './types';
import { CollectorExporterNodeConfigBase, CompressionAlgorithm } from './types';
import * as collectorTypes from '../../types';
import { parseHeaders } from '../../util';
import { createHttpAgent, sendWithHttp } from './util';
Expand All @@ -39,6 +39,7 @@ export abstract class CollectorExporterNodeBase<
DEFAULT_HEADERS: Record<string, string> = {};
headers: Record<string, string>;
agent: http.Agent | https.Agent | undefined;
compression: CompressionAlgorithm;

constructor(config: CollectorExporterNodeConfigBase = {}) {
super(config);
Expand All @@ -51,6 +52,7 @@ export abstract class CollectorExporterNodeBase<
baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS)
);
this.agent = createHttpAgent(config);
this.compression = config.compression || CompressionAlgorithm.NONE;
}

onInit(_config: CollectorExporterNodeConfigBase): void {
Expand Down
Expand Up @@ -24,5 +24,11 @@ import { CollectorExporterConfigBase } from '../../types';
export interface CollectorExporterNodeConfigBase
extends CollectorExporterConfigBase {
keepAlive?: boolean;
compression?: CompressionAlgorithm;
httpAgentOptions?: http.AgentOptions | https.AgentOptions;
}

export enum CompressionAlgorithm {
NONE = 'none',
GZIP = 'gzip'
}
Expand Up @@ -16,10 +16,15 @@
import * as url from 'url';
import * as http from 'http';
import * as https from 'https';
import * as zlib from 'zlib';
import { Readable } from 'stream';
import * as collectorTypes from '../../types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import { CollectorExporterNodeConfigBase } from '.';
import { diag } from '@opentelemetry/api';
import { CompressionAlgorithm } from './types';

const gzip = zlib.createGzip();

/**
* Sends data using http
Expand Down Expand Up @@ -71,11 +76,35 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
});
});


req.on('error', (error: Error) => {
onError(error);
});
req.write(data);
req.end();

switch (collector.compression) {
case CompressionAlgorithm.GZIP: {
req.setHeader('Content-Encoding', 'gzip');
const dataStream = readableFromBuffer(data);
dataStream.on('error', onError)
.pipe(gzip).on('error', onError)
.pipe(req);

break;
}
default:
req.write(data);
req.end();

break;
}
}

function readableFromBuffer(buff: string | Buffer): Readable {
const readable = new Readable();
readable.push(buff);
readable.push(null);

return readable;
}

export function createHttpAgent(
Expand Down

0 comments on commit ec88344

Please sign in to comment.