Skip to content

Commit

Permalink
feat(exporter-collector): change compression config to enum
Browse files Browse the repository at this point in the history
  • Loading branch information
alisabzevari committed Jul 12, 2021
1 parent ec88f76 commit 446c71b
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 21 deletions.
Expand Up @@ -19,12 +19,13 @@ import {
CollectorExporterNodeBase as CollectorExporterBaseMain,
collectorTypes,
CollectorExporterNodeConfigBase,
CompressionAlgorithm,
} from '@opentelemetry/exporter-collector';
import { ServiceClientType } from './types';

type SendFn = <ExportItem, ServiceRequest>(collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
objects: ExportItem[],
compress: boolean,
compression: CompressionAlgorithm,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void) => void;

Expand Down Expand Up @@ -61,7 +62,7 @@ export abstract class CollectorExporterNodeBase<
this._sendingPromises.splice(index, 1);
};

this._send(this, objects, this.compress, _onSuccess, _onError);
this._send(this, objects, this.compression, _onSuccess, _onError);
});

this._sendingPromises.push(promise);
Expand Down
5 changes: 3 additions & 2 deletions packages/opentelemetry-exporter-collector-proto/src/util.ts
Expand Up @@ -18,6 +18,7 @@ import {
collectorTypes,
sendWithHttp,
CollectorExporterNodeConfigBase,
CompressionAlgorithm,
} from '@opentelemetry/exporter-collector';
import * as path from 'path';

Expand Down Expand Up @@ -63,7 +64,7 @@ export function onInit<ExportItem, ServiceRequest>(
export function send<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
objects: ExportItem[],
compress: boolean,
compression: CompressionAlgorithm,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
Expand All @@ -77,7 +78,7 @@ export function send<ExportItem, ServiceRequest>(
collector,
Buffer.from(body),
'application/x-protobuf',
compress,
compression,
onSuccess,
onError
);
Expand Down
Expand Up @@ -19,6 +19,7 @@ import { ExportResultCode } from '@opentelemetry/core';
import {
CollectorExporterNodeConfigBase,
collectorTypes,
CompressionAlgorithm,
} from '@opentelemetry/exporter-collector';
import { ReadableSpan } from '@opentelemetry/tracing';
import * as assert from 'assert';
Expand Down Expand Up @@ -211,7 +212,7 @@ describe('CollectorTraceExporter - node with proto over http', () => {
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
compress: true,
compression: CompressionAlgorithm.GZIP,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorTraceExporter(collectorExporterConfig);
Expand Down
Expand Up @@ -18,7 +18,7 @@ import type * as http from 'http';
import type * as https from 'https';

import { CollectorExporterBase } from '../../CollectorExporterBase';
import { CollectorExporterNodeConfigBase } from './types';
import { CollectorExporterNodeConfigBase, CompressionAlgorithm } from './types';
import * as collectorTypes from '../../types';
import { parseHeaders } from '../../util';
import { createHttpAgent, sendWithHttp } from './util';
Expand All @@ -39,7 +39,7 @@ export abstract class CollectorExporterNodeBase<
DEFAULT_HEADERS: Record<string, string> = {};
headers: Record<string, string>;
agent: http.Agent | https.Agent | undefined;
compress: boolean;
compression: CompressionAlgorithm;

constructor(config: CollectorExporterNodeConfigBase = {}) {
super(config);
Expand All @@ -52,7 +52,7 @@ export abstract class CollectorExporterNodeBase<
baggageUtils.parseKeyPairsIntoRecord(getEnv().OTEL_EXPORTER_OTLP_HEADERS)
);
this.agent = createHttpAgent(config);
this.compress = config.compress || false;
this.compression = config.compression || CompressionAlgorithm.NONE;
}

onInit(_config: CollectorExporterNodeConfigBase): void {
Expand Down Expand Up @@ -88,7 +88,7 @@ export abstract class CollectorExporterNodeBase<
this,
JSON.stringify(serviceRequest),
'application/json',
this.compress,
this.compression,
_onSuccess,
_onError
);
Expand Down
Expand Up @@ -24,6 +24,11 @@ import { CollectorExporterConfigBase } from '../../types';
export interface CollectorExporterNodeConfigBase
extends CollectorExporterConfigBase {
keepAlive?: boolean;
compress?: boolean;
compression?: CompressionAlgorithm;
httpAgentOptions?: http.AgentOptions | https.AgentOptions;
}

export enum CompressionAlgorithm {
NONE = 'none',
GZIP = 'gzip'
}
27 changes: 17 additions & 10 deletions packages/opentelemetry-exporter-collector/src/platform/node/util.ts
Expand Up @@ -22,6 +22,7 @@ import * as collectorTypes from '../../types';
import { CollectorExporterNodeBase } from './CollectorExporterNodeBase';
import { CollectorExporterNodeConfigBase } from '.';
import { diag } from '@opentelemetry/api';
import { CompressionAlgorithm } from './types';

const gzip = zlib.createGzip();

Expand All @@ -37,7 +38,7 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
collector: CollectorExporterNodeBase<ExportItem, ServiceRequest>,
data: string | Buffer,
contentType: string,
compress: boolean,
compression: CompressionAlgorithm,
onSuccess: () => void,
onError: (error: collectorTypes.CollectorExporterError) => void
): void {
Expand Down Expand Up @@ -82,22 +83,28 @@ export function sendWithHttp<ExportItem, ServiceRequest>(
onError(error);
});

if (compress) {
const dataStream = readableFromBuffer(data);
dataStream.on('error', onError)
.pipe(gzip).on('error', onError)
.pipe(req);
} else {
req.write(data);
req.end();
switch (compression) {
case CompressionAlgorithm.GZIP: {
const dataStream = readableFromBuffer(data);
dataStream.on('error', onError)
.pipe(gzip).on('error', onError)
.pipe(req);

break;
}
default:
req.write(data);
req.end();

break;
}
}

function readableFromBuffer(buff: string | Buffer): Readable {
const readable = new Readable();
readable.push(buff);
readable.push(null);

return readable;
}

Expand Down
Expand Up @@ -25,6 +25,7 @@ import * as zlib from 'zlib';
import {
CollectorTraceExporter,
CollectorExporterNodeConfigBase,
CompressionAlgorithm,
} from '../../src/platform/node';
import * as collectorTypes from '../../src/types';
import { MockedResponse } from './nodeHelpers';
Expand Down Expand Up @@ -258,7 +259,7 @@ describe('CollectorTraceExporter - node with json over http', () => {
attributes: {},
url: 'http://foo.bar.com',
keepAlive: true,
compress: true,
compression: CompressionAlgorithm.GZIP,
httpAgentOptions: { keepAliveMsecs: 2000 },
};
collectorExporter = new CollectorTraceExporter(collectorExporterConfig);
Expand Down

0 comments on commit 446c71b

Please sign in to comment.