Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gzip compression for node exporter collector #2337

Merged
Merged
Expand Up @@ -19,17 +19,28 @@ import {
CollectorExporterNodeBase as CollectorExporterBaseMain,
collectorTypes,
CollectorExporterNodeConfigBase,
CompressionAlgorithm,
} from '@opentelemetry/exporter-collector';
import { ServiceClientType } from './types';

type SendFn = <ExportItem, ServiceRequest>(collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
objects: ExportItem[],
compression: CompressionAlgorithm,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void) => void;

/**
* Collector Metric Exporter abstract base class
*/
export abstract class CollectorExporterNodeBase<
ExportItem,
ServiceRequest
> extends CollectorExporterBaseMain<ExportItem, ServiceRequest> {
private _send!: Function;
private _send!: SendFn;

constructor(config: CollectorExporterNodeConfigBase = {}) {
super(config)
}

private _sendPromise(
objects: ExportItem[],
Expand All @@ -51,7 +62,7 @@ export abstract class CollectorExporterNodeBase<
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, _onSuccess, _onError);
this._send(this, objects, this.compression, _onSuccess, _onError);
});

this._sendingPromises.push(promise);
Expand Down
3 changes: 3 additions & 0 deletions packages/opentelemetry-exporter-collector-proto/src/util.ts
Expand Up @@ -18,6 +18,7 @@ import {
collectorTypes,
sendWithHttp,
CollectorExporterNodeConfigBase,
CompressionAlgorithm,
} from '@opentelemetry/exporter-collector';
import * as path from 'path';

Expand Down Expand Up @@ -63,6 +64,7 @@ export function onInit<ExportItem, ServiceRequest>(
export function send<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
objects: ExportItem[],
compression: CompressionAlgorithm,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
Expand All @@ -76,6 +78,7 @@ export function send<ExportItem, ServiceRequest>(
collector,
Buffer.from(body),
'application/x-protobuf',
compression,
onSuccess,
onError
);
Expand Down
Expand Up @@ -19,11 +19,14 @@ import { ExportResultCode } from '@opentelemetry/core';
import {
CollectorExporterNodeConfigBase,
collectorTypes,
CompressionAlgorithm,
} from '@opentelemetry/exporter-collector';
import { ReadableSpan } from '@opentelemetry/tracing';
import * as assert from 'assert';
import * as http from 'http';
import * as sinon from 'sinon';
import { Stream } from 'stream';
import * as zlib from 'zlib';
import { CollectorTraceExporter } from '../src';
import { getExportRequestProto } from '../src/util';
import {
Expand All @@ -34,9 +37,9 @@ import {
} from './helper';

const fakeRequest = {
end: function () {},
on: function () {},
write: function () {},
end: function () { },
on: function () { },
write: function () { },
};

describe('CollectorTraceExporter - node with proto over http', () => {
Expand Down Expand Up @@ -104,7 +107,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should open the connection', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

sinon.stub(http, 'request').callsFake((options: any) => {
assert.strictEqual(options.hostname, 'foo.bar.com');
Expand All @@ -116,7 +119,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should set custom headers', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

sinon.stub(http, 'request').callsFake((options: any) => {
assert.strictEqual(options.headers['foo'], 'bar');
Expand All @@ -126,7 +129,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should have keep alive and keepAliveMsecs option set', done => {
collectorExporter.export(spans, () => {});
collectorExporter.export(spans, () => { });

sinon.stub(http, 'request').callsFake((options: any) => {
assert.strictEqual(options.agent.keepAlive, true);
Expand All @@ -137,27 +140,31 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});

it('should successfully send the spans', done => {
collectorExporter.export(spans, () => {});

sinon.stub(http, 'request').returns({
end: () => {},
on: () => {},
write: (...args: any[]) => {
const ExportTraceServiceRequestProto = getExportRequestProto();
const data = ExportTraceServiceRequestProto?.decode(args[0]);
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureProtoSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);

done();
},
} as any);
const fakeRequest = new Stream.PassThrough();
sinon.stub(http, 'request').returns(fakeRequest as any);

let buff = Buffer.from('');
fakeRequest.on('end', () => {
const ExportTraceServiceRequestProto = getExportRequestProto();
const data = ExportTraceServiceRequestProto?.decode(buff);
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureProtoSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);

done();
});

fakeRequest.on('data', chunk => {
buff = Buffer.concat([buff, chunk]);
});

collectorExporter.export(spans, () => { });
});

it('should log the successful message', done => {
Expand Down Expand Up @@ -195,4 +202,54 @@ describe('CollectorTraceExporter - node with proto over http', () => {
});
});
});
describe('export - with compression', () => {
beforeEach(() => {
collectorExporterConfig = {
headers: {
foo: 'bar',
},
hostname: 'foo',
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
compression: CompressionAlgorithm.GZIP,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorTraceExporter(collectorExporterConfig);
spans = [];
spans.push(Object.assign({}, mockedReadableSpan));
});
afterEach(() => {
sinon.restore();
});

it('should successfully send the spans', done => {
const fakeRequest = new Stream.PassThrough();
sinon.stub(http, 'request').returns(fakeRequest as any);

let buff = Buffer.from('');
fakeRequest.on('end', () => {
const unzippedBuff = zlib.gunzipSync(buff);
const ExportTraceServiceRequestProto = getExportRequestProto();
const data = ExportTraceServiceRequestProto?.decode(unzippedBuff);
const json = data?.toJSON() as collectorTypes.opentelemetryProto.collector.trace.v1.ExportTraceServiceRequest;
const span1 =
json.resourceSpans[0].instrumentationLibrarySpans[0].spans[0];
assert.ok(typeof span1 !== 'undefined', "span doesn't exist");
if (span1) {
ensureProtoSpanIsCorrect(span1);
}

ensureExportTraceServiceRequestIsSet(json);

done();
});

fakeRequest.on('data', chunk => {
buff = Buffer.concat([buff, chunk]);
});

collectorExporter.export(spans, () => { });
});
});
});
Expand Up @@ -18,7 +18,7 @@ import type * as http from 'http';
import type * as https from 'https';

import { CollectorExporterBase } from '../../CollectorExporterBase';
import { CollectorExporterNodeConfigBase } from './types';
import { CollectorExporterNodeConfigBase, CompressionAlgorithm } from './types';
import * as collectorTypes from '../../types';
import { parseHeaders } from '../../util';
import { createHttpAgent, sendWithHttp } from './util';
Expand All @@ -39,6 +39,7 @@ export abstract class CollectorExporterNodeBase<
DEFAULT_HEADERS: Record<string, string> = {};
headers: Record<string, string>;
agent: http.Agent | https.Agent | undefined;
compression: CompressionAlgorithm;

constructor(config: CollectorExporterNodeConfigBase = {}) {
super(config);
Expand All @@ -51,6 +52,7 @@ export abstract class CollectorExporterNodeBase<
baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS)
);
this.agent = createHttpAgent(config);
this.compression = config.compression || CompressionAlgorithm.NONE;
}

onInit(_config: CollectorExporterNodeConfigBase): void {
Expand Down Expand Up @@ -86,6 +88,7 @@ export abstract class CollectorExporterNodeBase<
this,
JSON.stringify(serviceRequest),
'application/json',
this.compression,
_onSuccess,
_onError
);
Expand Down
Expand Up @@ -24,5 +24,11 @@ import { CollectorExporterConfigBase } from '../../types';
export interface CollectorExporterNodeConfigBase
extends CollectorExporterConfigBase {
keepAlive?: boolean;
compression?: CompressionAlgorithm;
httpAgentOptions?: http.AgentOptions | https.AgentOptions;
}

export enum CompressionAlgorithm {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment, why define the enum values as strings?
Why not just a numeric value as the "NAME" will still be available is the resulting code and if required we can always look it up in config via uppercasing ... Just a thought.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally i advise using string to avoid problem when trying to update available value (that can share the same number even if values are different across versions)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I know normal enums are hard to use for non typescript users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Flarna I know that's true for const enums but I'm not sure it's the case for regular enums. Do you have a particular case you're thinking of?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I know what you mean after thinking a little more. You're not thinking they can't do CompressionAlgorithm.GZIP but that 1 doesn't mean anything to them and passing a string 'gzip' is more clear?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normal enums are exported as objects so a JS user can use CompressionAlgorithm.GZIP. A JS user can also use the underlying constant (e.g. gzip).
The main difference is documentation. A typescript user can't to much wrong as editor will tell the allowed values to use and compiler checks again.

For JS users it's needed to document that an object named CompressionAlgorithm is exported incl. the properties (GZIP and NONE) referring to the constants to use.

This works fine but often JS APIs document the actual values to use instead referring to an object holding the constants (would be gzip/none here).

Standard const enums are really bad as JS users have to use the numeric values then.
const enums with strings as values would require JS users to use these strings in their code.

NONE = 'none',
GZIP = 'gzip'
}
Expand Up @@ -16,10 +16,15 @@
import * as url from 'url';
import * as http from 'http';
import * as https from 'https';
import * as zlib from 'zlib';
import { Readable } from 'stream';
import * as collectorTypes from '../../types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import { CollectorExporterNodeConfigBase } from '.';
import { diag } from '@opentelemetry/api';
import { CompressionAlgorithm } from './types';

const gzip = zlib.createGzip();

/**
* Sends data using http
Expand All @@ -33,6 +38,7 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
data: string | Buffer,
contentType: string,
compression: CompressionAlgorithm,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
Expand All @@ -46,6 +52,7 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
headers: {
'Content-Length': Buffer.byteLength(data),
'Content-Type': contentType,
'Content-Encoding': 'gzip',
obecny marked this conversation as resolved.
Show resolved Hide resolved
...collector.headers,
},
agent: collector.agent,
Expand All @@ -71,11 +78,34 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
});
});


req.on('error', (error: Error) => {
onError(error);
});
req.write(data);
req.end();

switch (compression) {
case CompressionAlgorithm.GZIP: {
const dataStream = readableFromBuffer(data);
dataStream.on('error', onError)
.pipe(gzip).on('error', onError)
.pipe(req);

break;
}
default:
req.write(data);
req.end();

break;
}
}

function readableFromBuffer(buff: string | Buffer): Readable {
const readable = new Readable();
readable.push(buff);
readable.push(null);

return readable;
}

export function createHttpAgent(
Expand Down